diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/ExtractEntitiesFromHDFSJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/ExtractEntitiesFromHDFSJob.java deleted file mode 100644 index 22b61798e..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/ExtractEntitiesFromHDFSJob.java +++ /dev/null @@ -1,68 +0,0 @@ -package eu.dnetlib.dhp.migration; - -import java.util.Arrays; -import java.util.List; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.io.Text; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SparkSession; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.Dataset; -import eu.dnetlib.dhp.schema.oaf.Datasource; -import eu.dnetlib.dhp.schema.oaf.Organization; -import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; -import eu.dnetlib.dhp.schema.oaf.Project; -import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.Software; -import scala.Tuple2; - -public class ExtractEntitiesFromHDFSJob { - - public static void main(final String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils.toString(MigrateMongoMdstoresApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json"))); - parser.parseArgument(args); - - final SparkSession spark = SparkSession - .builder() - .appName(ExtractEntitiesFromHDFSJob.class.getSimpleName()) - .master(parser.get("master")) - .getOrCreate(); - - final List sourcePaths = Arrays.asList(parser.get("sourcePaths").split(",")); - final String targetPath = parser.get("graphRawPath"); - - try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) { - processEntity(sc, Publication.class, sourcePaths, targetPath); - processEntity(sc, Dataset.class, sourcePaths, targetPath); - processEntity(sc, Software.class, sourcePaths, targetPath); - processEntity(sc, OtherResearchProduct.class, sourcePaths, targetPath); - processEntity(sc, Datasource.class, sourcePaths, targetPath); - processEntity(sc, Organization.class, sourcePaths, targetPath); - processEntity(sc, Project.class, sourcePaths, targetPath); - processEntity(sc, Relation.class, sourcePaths, targetPath); - } - } - - private static void processEntity(final JavaSparkContext sc, final Class clazz, final List sourcePaths, final String targetPath) { - final String type = clazz.getSimpleName().toLowerCase(); - - final JavaRDD inputRdd = sc.emptyRDD(); - sourcePaths.forEach(sourcePath -> inputRdd.union(sc.sequenceFile(sourcePath, Text.class, Text.class) - .map(k -> new Tuple2<>(k._1().toString(), k._2().toString())) - .filter(k -> isEntityType(k._1(), type)) - .map(Tuple2::_2))); - - inputRdd.saveAsTextFile(targetPath + "/" + type); - } - - private static boolean isEntityType(final String item, final String entity) { - return StringUtils.substringAfter(item, ":").equalsIgnoreCase(entity); - } -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateMongoMdstoresApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateMongoMdstoresApplication.java deleted file mode 100644 index 359fe7596..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateMongoMdstoresApplication.java +++ /dev/null @@ -1,45 +0,0 @@ -package eu.dnetlib.dhp.migration; - -import org.apache.commons.io.IOUtils; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; - -public class MigrateMongoMdstoresApplication { - - public static void main(final String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils.toString(MigrateMongoMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json"))); - parser.parseArgument(args); - - final String mongoBaseUrl = parser.get("mongoBaseUrl"); - final String mongoDb = parser.get("mongoDb"); - - final String mdFormat = parser.get("mdFormat"); - final String mdLayout = parser.get("mdLayout"); - final String mdInterpretation = parser.get("mdInterpretation"); - - final String hdfsPath = parser.get("hdfsPath"); - final String hdfsNameNode = parser.get("namenode"); - final String hdfsUser = parser.get("hdfsUser"); - - final String dbUrl = parser.get("postgresUrl"); - final String dbUser = parser.get("postgresUser"); - final String dbPassword = parser.get("postgresPassword"); - - if (mdFormat.equalsIgnoreCase("oaf")) { - try (final OafMigrationExecutor mig = - new OafMigrationExecutor(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword)) { - mig.processMdRecords(mdFormat, mdLayout, mdInterpretation); - } - } else if (mdFormat.equalsIgnoreCase("odf")) { - try (final OdfMigrationExecutor mig = - new OdfMigrationExecutor(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword)) { - mig.processMdRecords(mdFormat, mdLayout, mdInterpretation); - } - } else { - throw new RuntimeException("Format not supported: " + mdFormat); - } - - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateDbEntitiesApplication.java similarity index 96% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateDbEntitiesApplication.java index 1ccfd09ef..b6ebc6b98 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateDbEntitiesApplication.java @@ -1,4 +1,14 @@ -package eu.dnetlib.dhp.migration; +package eu.dnetlib.dhp.migration.step1; + +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.asString; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.dataInfo; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.journal; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.listFields; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.listKeyValues; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.qualifier; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.structuredProperty; import java.io.Closeable; import java.io.IOException; @@ -17,6 +27,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.migration.utils.AbstractMigrationApplication; +import eu.dnetlib.dhp.migration.utils.DbClient; import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Dataset; @@ -34,7 +46,7 @@ import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor implements Closeable { +public class MigrateDbEntitiesApplication extends AbstractMigrationApplication implements Closeable { private static final Qualifier ENTITYREGISTRY_PROVENANCE_ACTION = qualifier("sysimport:crosswalk:entityregistry", "sysimport:crosswalk:entityregistry", "dnet:provenance_actions", "dnet:provenance_actions"); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateMongoMdstoresApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateMongoMdstoresApplication.java new file mode 100644 index 000000000..dad1278e9 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateMongoMdstoresApplication.java @@ -0,0 +1,70 @@ +package eu.dnetlib.dhp.migration.step1; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.migration.utils.AbstractMigrationApplication; +import eu.dnetlib.dhp.migration.utils.MdstoreClient; + +public class MigrateMongoMdstoresApplication extends AbstractMigrationApplication implements Closeable { + + private static final Log log = LogFactory.getLog(MigrateMongoMdstoresApplication.class); + + private final MdstoreClient mdstoreClient; + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString(MigrateMongoMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json"))); + parser.parseArgument(args); + + final String mongoBaseUrl = parser.get("mongoBaseUrl"); + final String mongoDb = parser.get("mongoDb"); + + final String mdFormat = parser.get("mdFormat"); + final String mdLayout = parser.get("mdLayout"); + final String mdInterpretation = parser.get("mdInterpretation"); + + final String hdfsPath = parser.get("hdfsPath"); + final String hdfsNameNode = parser.get("namenode"); + final String hdfsUser = parser.get("hdfsUser"); + + try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb)) { + app.execute(mdFormat, mdLayout, mdInterpretation); + } + + } + + public MigrateMongoMdstoresApplication(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String mongoBaseUrl, + final String mongoDb) throws Exception { + super(hdfsPath, hdfsNameNode, hdfsUser); + this.mdstoreClient = new MdstoreClient(mongoBaseUrl, mongoDb); + } + + public void execute(final String format, final String layout, final String interpretation) { + final Map colls = mdstoreClient.validCollections(format, layout, interpretation); + log.info("Found " + colls.size() + " mdstores"); + + for (final Entry entry : colls.entrySet()) { + log.info("Processing mdstore " + entry.getKey() + " (collection: " + entry.getValue() + ")"); + final String currentColl = entry.getValue(); + + for (final String xml : mdstoreClient.listRecords(currentColl)) { + emit(xml, "native_" + format); + } + } + } + + @Override + public void close() throws IOException { + super.close(); + mdstoreClient.close(); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMongoExecutor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/AbstractMdRecordToOafMapper.java similarity index 80% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMongoExecutor.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/AbstractMdRecordToOafMapper.java index 0595726d4..245d53df0 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMongoExecutor.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/AbstractMdRecordToOafMapper.java @@ -1,20 +1,24 @@ -package eu.dnetlib.dhp.migration; +package eu.dnetlib.dhp.migration.step2; + +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.dataInfo; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.journal; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.keyValue; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.listFields; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.oaiIProvenance; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.qualifier; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.structuredProperty; -import java.io.IOException; -import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.dom4j.Document; -import org.dom4j.DocumentException; import org.dom4j.DocumentFactory; import org.dom4j.DocumentHelper; import org.dom4j.Node; @@ -37,11 +41,9 @@ import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -public abstract class AbstractMongoExecutor extends AbstractMigrationExecutor { +public abstract class AbstractMdRecordToOafMapper { - protected final Map code2name = new HashMap<>(); - - protected final MdstoreClient mdstoreClient; + protected final Map code2name; protected static final Qualifier MAIN_TITLE_QUALIFIER = qualifier("main title", "main title", "dnet:dataCite_title", "dnet:dataCite_title"); @@ -51,79 +53,36 @@ public abstract class AbstractMongoExecutor extends AbstractMigrationExecutor { protected static final Qualifier SOFTWARE_RESULTTYPE_QUALIFIER = qualifier("software", "software", "dnet:result_typologies", "dnet:result_typologies"); protected static final Qualifier OTHER_RESULTTYPE_QUALIFIER = qualifier("other", "other", "dnet:result_typologies", "dnet:result_typologies"); - private static final Log log = LogFactory.getLog(AbstractMongoExecutor.class); - - public AbstractMongoExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String mongoBaseUrl, - final String mongoDb, final String dbUrl, final String dbUser, - final String dbPassword) throws Exception { - - super(hdfsPath, hdfsNameNode, hdfsUser); - - this.mdstoreClient = new MdstoreClient(mongoBaseUrl, mongoDb); - loadClassNames(dbUrl, dbUser, dbPassword); - - final Map nsContext = new HashMap<>(); - - registerNamespaces(nsContext); - - DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext); + protected AbstractMdRecordToOafMapper(final Map code2name) { + this.code2name = code2name; } - private void loadClassNames(final String dbUrl, final String dbUser, final String dbPassword) throws IOException { + public List processMdRecord(final String xml) { + try { + final Map nsContext = new HashMap<>(); + nsContext.put("dr", "http://www.driver-repository.eu/namespace/dr"); + nsContext.put("dri", "http://www.driver-repository.eu/namespace/dri"); + nsContext.put("oaf", "http://namespace.openaire.eu/oaf"); + nsContext.put("oai", "http://www.openarchives.org/OAI/2.0/"); + nsContext.put("prov", "http://www.openarchives.org/OAI/2.0/provenance"); + nsContext.put("dc", "http://purl.org/dc/elements/1.1/"); + nsContext.put("datacite", "http://datacite.org/schema/kernel-3"); + DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext); - log.info("Loading vocabulary terms from db..."); + final Document doc = DocumentHelper.parseText(xml); - try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) { - code2name.clear(); - dbClient.processResults("select code, name from class", rs -> { - try { - code2name.put(rs.getString("code"), rs.getString("name")); - } catch (final SQLException e) { - e.printStackTrace(); - } - }); + final String type = doc.valueOf("//dr:CobjCategory/@type"); + final KeyValue collectedFrom = keyValue(doc.valueOf("//oaf:collectedFrom/@id"), doc.valueOf("//oaf:collectedFrom/@name")); + final KeyValue hostedBy = StringUtils.isBlank(doc.valueOf("//oaf:hostedBy/@id")) ? collectedFrom + : keyValue(doc.valueOf("//oaf:hostedBy/@id"), doc.valueOf("//oaf:hostedBy/@name")); + + final DataInfo info = prepareDataInfo(doc); + final long lastUpdateTimestamp = new Date().getTime(); + + return createOafs(doc, type, collectedFrom, hostedBy, info, lastUpdateTimestamp); + } catch (final Exception e) { + throw new RuntimeException(e); } - - log.info("Found " + code2name.size() + " terms."); - - } - - public void processMdRecords(final String mdFormat, final String mdLayout, final String mdInterpretation) throws DocumentException { - - log.info(String.format("Searching mdstores (format: %s, layout: %s, interpretation: %s)", mdFormat, mdLayout, mdInterpretation)); - - final Map colls = mdstoreClient.validCollections(mdFormat, mdLayout, mdInterpretation); - log.info("Found " + colls.size() + " mdstores"); - - for (final Entry entry : colls.entrySet()) { - log.info("Processing mdstore " + entry.getKey() + " (collection: " + entry.getValue() + ")"); - final String currentColl = entry.getValue(); - - for (final String xml : mdstoreClient.listRecords(currentColl)) { - final Document doc = DocumentHelper.parseText(xml); - - final String type = doc.valueOf("//dr:CobjCategory/@type"); - final KeyValue collectedFrom = keyValue(doc.valueOf("//oaf:collectedFrom/@id"), doc.valueOf("//oaf:collectedFrom/@name")); - final KeyValue hostedBy = StringUtils.isBlank(doc.valueOf("//oaf:hostedBy/@id")) ? collectedFrom - : keyValue(doc.valueOf("//oaf:hostedBy/@id"), doc.valueOf("//oaf:hostedBy/@name")); - - final DataInfo info = prepareDataInfo(doc); - final long lastUpdateTimestamp = new Date().getTime(); - - for (final Oaf oaf : createOafs(doc, type, collectedFrom, hostedBy, info, lastUpdateTimestamp)) { - emitOaf(oaf); - } - } - } - log.info("All Done."); - } - - protected void registerNamespaces(final Map nsContext) { - nsContext.put("dr", "http://www.driver-repository.eu/namespace/dr"); - nsContext.put("dri", "http://www.driver-repository.eu/namespace/dri"); - nsContext.put("oaf", "http://namespace.openaire.eu/oaf"); - nsContext.put("oai", "http://www.openarchives.org/OAI/2.0/"); - nsContext.put("prov", "http://www.openarchives.org/OAI/2.0/provenance"); } protected List createOafs(final Document doc, @@ -432,10 +391,4 @@ public abstract class AbstractMongoExecutor extends AbstractMigrationExecutor { return res; } - @Override - public void close() throws IOException { - super.close(); - mdstoreClient.close(); - } - } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java new file mode 100644 index 000000000..5627a720b --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java @@ -0,0 +1,170 @@ +package eu.dnetlib.dhp.migration.step2; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication; +import eu.dnetlib.dhp.migration.utils.DbClient; +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Datasource; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Software; +import scala.Tuple2; + +public class GenerateEntitiesApplication { + + private static final Log log = LogFactory.getLog(GenerateEntitiesApplication.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString(MigrateMongoMdstoresApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/migration/generate_entities_parameters.json"))); + + parser.parseArgument(args); + + final String sourcePaths = parser.get("sourcePaths"); + final String targetPath = parser.get("targetPath"); + + final String dbUrl = parser.get("postgresUrl"); + final String dbUser = parser.get("postgresUser"); + final String dbPassword = parser.get("postgresPassword"); + + final SparkSession spark = SparkSession + .builder() + .appName(GenerateEntitiesApplication.class.getSimpleName()) + .master(parser.get("master")) + .getOrCreate(); + + final Map code2name = loadClassNames(dbUrl, dbUser, dbPassword); + + try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) { + final List existingSourcePaths = Arrays.stream(sourcePaths.split(",")).filter(p -> exists(sc, p)).collect(Collectors.toList()); + generateEntities(sc, code2name, existingSourcePaths, targetPath); + } + } + + private static void generateEntities(final JavaSparkContext sc, + final Map code2name, + final List sourcePaths, + final String targetPath) { + + log.info("Generate entities from files:"); + sourcePaths.forEach(log::info); + + JavaRDD inputRdd = sc.emptyRDD(); + + for (final String sp : sourcePaths) { + inputRdd = inputRdd.union(sc.sequenceFile(sp, Text.class, Text.class) + .map(k -> new Tuple2<>(k._1().toString(), k._2().toString())) + .map(k -> convertToListOaf(k._1(), k._2(), code2name)) + .flatMap(list -> list.iterator()) + .map(oaf -> oaf.getClass().getSimpleName().toLowerCase() + "|" + convertToJson(oaf))); + } + + inputRdd.saveAsTextFile(targetPath); + + } + + private static List convertToListOaf(final String id, final String s, final Map code2name) { + final String type = StringUtils.substringAfter(id, ":"); + + switch (type.toLowerCase()) { + case "native_oaf": + return new OafToOafMapper(code2name).processMdRecord(s); + case "native_odf": + return new OafToOafMapper(code2name).processMdRecord(s); + case "datasource": + return Arrays.asList(convertFromJson(s, Datasource.class)); + case "organization": + return Arrays.asList(convertFromJson(s, Organization.class)); + case "project": + return Arrays.asList(convertFromJson(s, Project.class)); + case "relation": + return Arrays.asList(convertFromJson(s, Relation.class)); + case "publication": + return Arrays.asList(convertFromJson(s, Publication.class)); + case "dataset": + return Arrays.asList(convertFromJson(s, Dataset.class)); + case "software": + return Arrays.asList(convertFromJson(s, Software.class)); + case "otherresearchproducts": + default: + return Arrays.asList(convertFromJson(s, OtherResearchProduct.class)); + } + + } + + private static Map loadClassNames(final String dbUrl, final String dbUser, final String dbPassword) throws IOException { + + log.info("Loading vocabulary terms from db..."); + + final Map map = new HashMap<>(); + + try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) { + dbClient.processResults("select code, name from class", rs -> { + try { + map.put(rs.getString("code"), rs.getString("name")); + } catch (final SQLException e) { + e.printStackTrace(); + } + }); + } + + log.info("Found " + map.size() + " terms."); + + return map; + + } + + private static String convertToJson(final Oaf oaf) { + try { + return new ObjectMapper().writeValueAsString(oaf); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + private static Oaf convertFromJson(final String s, final Class clazz) { + try { + return new ObjectMapper().readValue(s, clazz); + } catch (final Exception e) { + log.error("Error parsing object of class: " + clazz); + log.error(s); + throw new RuntimeException(e); + } + } + + private static boolean exists(final JavaSparkContext context, final String pathToFile) { + try { + final FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(context.hadoopConfiguration()); + final Path path = new Path(pathToFile); + return hdfs.exists(path); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/OafMigrationExecutor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OafToOafMapper.java similarity index 90% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/OafMigrationExecutor.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OafToOafMapper.java index c32568290..0ca13941d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/OafMigrationExecutor.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OafToOafMapper.java @@ -1,16 +1,17 @@ -package eu.dnetlib.dhp.migration; +package eu.dnetlib.dhp.migration.step2; + +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.dom4j.Document; import org.dom4j.Node; -import eu.dnetlib.dhp.migration.pace.PacePerson; +import eu.dnetlib.dhp.migration.utils.PacePerson; import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Field; @@ -22,20 +23,10 @@ import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -public class OafMigrationExecutor extends AbstractMongoExecutor { +public class OafToOafMapper extends AbstractMdRecordToOafMapper { - private static final Log log = LogFactory.getLog(OafMigrationExecutor.class); - - public OafMigrationExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String mongoBaseUrl, final String mongoDb, - final String dbUrl, final String dbUser, - final String dbPassword) throws Exception { - super(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword); - } - - @Override - protected void registerNamespaces(final Map nsContext) { - super.registerNamespaces(nsContext); - nsContext.put("dc", "http://purl.org/dc/elements/1.1/"); + public OafToOafMapper(final Map code2name) { + super(code2name); } @Override diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/OdfMigrationExecutor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OdfToOafMapper.java similarity index 74% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/OdfMigrationExecutor.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OdfToOafMapper.java index 457534085..01c2d4b07 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/OdfMigrationExecutor.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OdfToOafMapper.java @@ -1,4 +1,8 @@ -package eu.dnetlib.dhp.migration; +package eu.dnetlib.dhp.migration.step2; + +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.structuredProperty; import java.util.ArrayList; import java.util.Arrays; @@ -6,8 +10,6 @@ import java.util.List; import java.util.Map; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.dom4j.Document; import org.dom4j.Node; @@ -22,38 +24,28 @@ import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -public class OdfMigrationExecutor extends AbstractMongoExecutor { +public class OdfToOafMapper extends AbstractMdRecordToOafMapper { - private static final Log log = LogFactory.getLog(OdfMigrationExecutor.class); - - public OdfMigrationExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String mongoBaseUrl, final String mongoDb, - final String dbUrl, final String dbUser, - final String dbPassword) throws Exception { - super(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword); - } - - @Override - protected void registerNamespaces(final Map nsContext) { - super.registerNamespaces(nsContext); - nsContext.put("dc", "http://datacite.org/schema/kernel-3"); + public OdfToOafMapper(final Map code2name) { + super(code2name); } @Override protected List prepareTitles(final Document doc, final DataInfo info) { - return prepareListStructProps(doc, "//dc:title", MAIN_TITLE_QUALIFIER, info); + return prepareListStructProps(doc, "//datacite:title", MAIN_TITLE_QUALIFIER, info); } @Override protected List prepareAuthors(final Document doc, final DataInfo info) { final List res = new ArrayList<>(); int pos = 1; - for (final Object o : doc.selectNodes("//dc:creator")) { + for (final Object o : doc.selectNodes("//datacite:creator")) { final Node n = (Node) o; final Author author = new Author(); - author.setFullname(n.valueOf("./dc:creatorName")); - author.setName(n.valueOf("./dc:givenName")); - author.setSurname(n.valueOf("./dc:familyName")); - author.setAffiliation(prepareListFields(doc, "./dc:affiliation", info)); + author.setFullname(n.valueOf("./datacite:creatorName")); + author.setName(n.valueOf("./datacite:givenName")); + author.setSurname(n.valueOf("./datacite:familyName")); + author.setAffiliation(prepareListFields(doc, "./datacite:affiliation", info)); author.setPid(preparePids(doc, info)); author.setRank(pos++); res.add(author); @@ -63,7 +55,7 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor { private List preparePids(final Document doc, final DataInfo info) { final List res = new ArrayList<>(); - for (final Object o : doc.selectNodes("./dc:nameIdentifier")) { + for (final Object o : doc.selectNodes("./datacite:nameIdentifier")) { res.add(structuredProperty(((Node) o).getText(), prepareQualifier((Node) o, "./@nameIdentifierScheme", "dnet:pid_types", "dnet:pid_types"), info)); } return res; @@ -72,7 +64,7 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor { @Override protected List prepareInstances(final Document doc, final DataInfo info, final KeyValue collectedfrom, final KeyValue hostedby) { final List res = new ArrayList<>(); - for (final Object o : doc.selectNodes("//dc:alternateIdentifier[@alternateIdentifierType='URL']")) { + for (final Object o : doc.selectNodes("//datacite:alternateIdentifier[@alternateIdentifierType='URL']")) { final Instance instance = new Instance(); instance.setUrl(Arrays.asList(((Node) o).getText().trim())); instance.setInstancetype(prepareQualifier(doc, "//dr:CobjCategory", "dnet:publication_resource", "dnet:publication_resource")); @@ -98,7 +90,7 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor { @Override protected List prepareRelevantDates(final Document doc, final DataInfo info) { final List res = new ArrayList<>(); - for (final Object o : doc.selectNodes("//dc:date")) { + for (final Object o : doc.selectNodes("//datacite:date")) { final String dateType = ((Node) o).valueOf("@dateType"); if (StringUtils.isBlank(dateType) && !dateType.equalsIgnoreCase("Accepted") && !dateType.equalsIgnoreCase("Issued") && !dateType.equalsIgnoreCase("Updated") && !dateType.equalsIgnoreCase("Available")) { @@ -115,32 +107,32 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor { @Override protected List> prepareContributors(final Document doc, final DataInfo info) { - return prepareListFields(doc, "//dc:contributorName", info); + return prepareListFields(doc, "//datacite:contributorName", info); } @Override protected List> prepareFormats(final Document doc, final DataInfo info) { - return prepareListFields(doc, "//dc:format", info); + return prepareListFields(doc, "//datacite:format", info); } @Override protected Field preparePublisher(final Document doc, final DataInfo info) { - return prepareField(doc, "//dc:publisher", info); + return prepareField(doc, "//datacite:publisher", info); } @Override protected List> prepareDescriptions(final Document doc, final DataInfo info) { - return prepareListFields(doc, "//dc:description[@descriptionType='Abstract']", info); + return prepareListFields(doc, "//datacite:description[@descriptionType='Abstract']", info); } @Override protected List prepareSubjects(final Document doc, final DataInfo info) { - return prepareListStructProps(doc, "//dc:subject", info); + return prepareListStructProps(doc, "//datacite:subject", info); } @Override protected Qualifier prepareLanguages(final Document doc) { - return prepareQualifier(doc, "//dc:language", "dnet:languages", "dnet:languages"); + return prepareQualifier(doc, "//datacite:language", "dnet:languages", "dnet:languages"); } @Override @@ -150,17 +142,17 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor { @Override protected List> prepareOtherResearchProductContactGroups(final Document doc, final DataInfo info) { - return prepareListFields(doc, "//dc:contributor[@contributorType='ContactGroup']/dc:contributorName", info); + return prepareListFields(doc, "//datacite:contributor[@contributorType='ContactGroup']/datacite:contributorName", info); } @Override protected List> prepareOtherResearchProductContactPersons(final Document doc, final DataInfo info) { - return prepareListFields(doc, "//dc:contributor[@contributorType='ContactPerson']/dc:contributorName", info); + return prepareListFields(doc, "//datacite:contributor[@contributorType='ContactPerson']/datacite:contributorName", info); } @Override protected Qualifier prepareSoftwareProgrammingLanguage(final Document doc, final DataInfo info) { - return prepareQualifier(doc, "//dc:format", "dnet:programming_languages", "dnet:programming_languages"); + return prepareQualifier(doc, "//datacite:format", "dnet:programming_languages", "dnet:programming_languages"); } @Override @@ -175,7 +167,7 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor { @Override protected List> prepareSoftwareDocumentationUrls(final Document doc, final DataInfo info) { - return prepareListFields(doc, "//dc:relatedIdentifier[@relatedIdentifierType='URL' and @relationType='IsDocumentedBy']", info); + return prepareListFields(doc, "//datacite:relatedIdentifier[@relatedIdentifierType='URL' and @relationType='IsDocumentedBy']", info); } // DATASETS @@ -184,11 +176,11 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor { protected List prepareDatasetGeoLocations(final Document doc, final DataInfo info) { final List res = new ArrayList<>(); - for (final Object o : doc.selectNodes("//dc:geoLocation")) { + for (final Object o : doc.selectNodes("//datacite:geoLocation")) { final GeoLocation loc = new GeoLocation(); - loc.setBox(((Node) o).valueOf("./dc:geoLocationBox")); - loc.setPlace(((Node) o).valueOf("./dc:geoLocationPlace")); - loc.setPoint(((Node) o).valueOf("./dc:geoLocationPoint")); + loc.setBox(((Node) o).valueOf("./datacite:geoLocationBox")); + loc.setPlace(((Node) o).valueOf("./datacite:geoLocationPlace")); + loc.setPoint(((Node) o).valueOf("./datacite:geoLocationPoint")); res.add(loc); } return res; @@ -201,17 +193,17 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor { @Override protected Field prepareDatasetLastMetadataUpdate(final Document doc, final DataInfo info) { - return prepareField(doc, "//dc:date[@dateType='Updated']", info); + return prepareField(doc, "//datacite:date[@dateType='Updated']", info); } @Override protected Field prepareDatasetVersion(final Document doc, final DataInfo info) { - return prepareField(doc, "//dc:version", info); + return prepareField(doc, "//datacite:version", info); } @Override protected Field prepareDatasetSize(final Document doc, final DataInfo info) { - return prepareField(doc, "//dc:size", info); + return prepareField(doc, "//datacite:size", info); } @Override @@ -221,7 +213,7 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor { @Override protected Field prepareDatasetStorageDate(final Document doc, final DataInfo info) { - return prepareField(doc, "//dc:date[@dateType='Issued']", info); + return prepareField(doc, "//datacite:date[@dateType='Issued']", info); } @Override diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java new file mode 100644 index 000000000..222828a49 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java @@ -0,0 +1,68 @@ +package eu.dnetlib.dhp.migration.step3; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication; +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Datasource; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Software; + +public class DispatchEntitiesApplication { + + private static final Log log = LogFactory.getLog(DispatchEntitiesApplication.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString(MigrateMongoMdstoresApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json"))); + parser.parseArgument(args); + + final SparkSession spark = SparkSession + .builder() + .appName(DispatchEntitiesApplication.class.getSimpleName()) + .master(parser.get("master")) + .getOrCreate(); + + try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) { + + final String sourcePath = parser.get("sourcePath"); + final String targetPath = parser.get("graphRawPath"); + + processEntity(sc, Publication.class, sourcePath, targetPath); + processEntity(sc, Dataset.class, sourcePath, targetPath); + processEntity(sc, Software.class, sourcePath, targetPath); + processEntity(sc, OtherResearchProduct.class, sourcePath, targetPath); + processEntity(sc, Datasource.class, sourcePath, targetPath); + processEntity(sc, Organization.class, sourcePath, targetPath); + processEntity(sc, Project.class, sourcePath, targetPath); + processEntity(sc, Relation.class, sourcePath, targetPath); + } + } + + private static void processEntity(final JavaSparkContext sc, final Class clazz, final String sourcePath, final String targetPath) { + final String type = clazz.getSimpleName().toLowerCase(); + + log.info(String.format("Processing entities (%s) in file: %s", type, sourcePath)); + + sc.textFile(sourcePath) + .filter(l -> isEntityType(l, type)) + .map(l -> StringUtils.substringAfter(l, "|")) + .saveAsTextFile(targetPath + "/" + type); // use repartition(XXX) ??? + } + + private static boolean isEntityType(final String line, final String type) { + return StringUtils.substringBefore(line, "|").equalsIgnoreCase(type); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/AbstractMigrationApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/AbstractMigrationApplication.java new file mode 100644 index 000000000..41f9f8145 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/AbstractMigrationApplication.java @@ -0,0 +1,80 @@ +package eu.dnetlib.dhp.migration.utils; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.codehaus.jackson.map.ObjectMapper; + +import eu.dnetlib.dhp.schema.oaf.Oaf; + +public class AbstractMigrationApplication implements Closeable { + + private final AtomicInteger counter = new AtomicInteger(0); + + private final Text key = new Text(); + + private final Text value = new Text(); + + private final SequenceFile.Writer writer; + + private final ObjectMapper objectMapper = new ObjectMapper(); + + private static final Log log = LogFactory.getLog(AbstractMigrationApplication.class); + + public AbstractMigrationApplication(final String hdfsPath, final String hdfsNameNode, final String hdfsUser) throws Exception { + + log.info(String.format("Creating SequenceFile Writer, hdfsPath=%s, nameNode=%s, user=%s", hdfsPath, hdfsNameNode, hdfsUser)); + + this.writer = SequenceFile.createWriter(getConf(hdfsNameNode, hdfsUser), SequenceFile.Writer.file(new Path(hdfsPath)), SequenceFile.Writer + .keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class)); + } + + private Configuration getConf(final String hdfsNameNode, final String hdfsUser) throws IOException { + final Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + System.setProperty("HADOOP_USER_NAME", hdfsUser); + System.setProperty("hadoop.home.dir", "/"); + FileSystem.get(URI.create(hdfsNameNode), conf); + return conf; + } + + protected void emit(final String s, final String type) { + try { + key.set(counter.getAndIncrement() + ":" + type); + value.set(s); + writer.append(key, value); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + protected void emitOaf(final Oaf oaf) { + try { + emit(objectMapper.writeValueAsString(oaf), oaf.getClass().getSimpleName().toLowerCase()); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + public ObjectMapper getObjectMapper() { + return objectMapper; + } + + @Override + public void close() throws IOException { + writer.hflush(); + writer.close(); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/DbClient.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/DbClient.java similarity index 97% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/DbClient.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/DbClient.java index 9ac0089d2..43270e452 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/DbClient.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/DbClient.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.migration; +package eu.dnetlib.dhp.migration.utils; import java.io.Closeable; import java.io.IOException; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MdstoreClient.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/MdstoreClient.java similarity index 98% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MdstoreClient.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/MdstoreClient.java index 87dadfc7a..612503da7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MdstoreClient.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/MdstoreClient.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.migration; +package eu.dnetlib.dhp.migration.utils; import java.io.Closeable; import java.io.IOException; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/OafMapperUtils.java similarity index 71% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/OafMapperUtils.java index b0db3c76f..89b344c68 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/OafMapperUtils.java @@ -1,24 +1,12 @@ -package eu.dnetlib.dhp.migration; +package eu.dnetlib.dhp.migration.utils; -import java.io.Closeable; -import java.io.IOException; -import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.codehaus.jackson.map.ObjectMapper; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.ExtraInfo; @@ -26,60 +14,12 @@ import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.dhp.schema.oaf.Journal; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.OAIProvenance; -import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.OriginDescription; import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.utils.DHPUtils; -public class AbstractMigrationExecutor implements Closeable { - - private final AtomicInteger counter = new AtomicInteger(0); - - private final Text key = new Text(); - - private final Text value = new Text(); - - private final ObjectMapper objectMapper = new ObjectMapper(); - - private final SequenceFile.Writer writer; - - private static final Log log = LogFactory.getLog(AbstractMigrationExecutor.class); - - public AbstractMigrationExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser) throws Exception { - - log.info(String.format("Creating SequenceFile Writer, hdfsPath=%s, nameNode=%s, user=%s", hdfsPath, hdfsNameNode, hdfsUser)); - - this.writer = SequenceFile.createWriter(getConf(hdfsNameNode, hdfsUser), SequenceFile.Writer.file(new Path(hdfsPath)), SequenceFile.Writer - .keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class)); - } - - private Configuration getConf(final String hdfsNameNode, final String hdfsUser) throws IOException { - final Configuration conf = new Configuration(); - conf.set("fs.defaultFS", hdfsNameNode); - conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); - conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); - System.setProperty("HADOOP_USER_NAME", hdfsUser); - System.setProperty("hadoop.home.dir", "/"); - FileSystem.get(URI.create(hdfsNameNode), conf); - return conf; - } - - protected void emitOaf(final Oaf oaf) { - try { - key.set(counter.getAndIncrement() + ":" + oaf.getClass().getSimpleName().toLowerCase()); - value.set(objectMapper.writeValueAsString(oaf)); - writer.append(key, value); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public void close() throws IOException { - writer.hflush(); - writer.close(); - } +public class OafMapperUtils { public static KeyValue keyValue(final String k, final String v) { final KeyValue kv = new KeyValue(); @@ -247,4 +187,5 @@ public class AbstractMigrationExecutor implements Closeable { public static String asString(final Object o) { return o == null ? "" : o.toString(); } + } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/pace/PacePerson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/PacePerson.java similarity index 99% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/pace/PacePerson.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/PacePerson.java index 927f5641b..69e128e63 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/pace/PacePerson.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/PacePerson.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.migration.pace; +package eu.dnetlib.dhp.migration.utils; import java.nio.charset.Charset; import java.text.Normalizer; diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json similarity index 70% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json index 0039493e7..8c81290ca 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json @@ -1,8 +1,8 @@ [ { "paramName": "s", - "paramLongName": "sourcePaths", - "paramDescription": "the HDFS source paths which contains the sequential file (comma separated)", + "paramLongName": "sourcePath", + "paramDescription": "the source path", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/generate_entities_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/generate_entities_parameters.json new file mode 100644 index 000000000..53ee010c4 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/generate_entities_parameters.json @@ -0,0 +1,39 @@ +[ + { + "paramName": "s", + "paramLongName": "sourcePaths", + "paramDescription": "the HDFS source paths which contains the sequential file (comma separated)", + "paramRequired": true + }, + { + "paramName": "mt", + "paramLongName": "master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "targetPath", + "paramDescription": "the path of the target file", + "paramRequired": true + }, + { + "paramName": "pgurl", + "paramLongName": "postgresUrl", + "paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb", + "paramRequired": true + }, + { + "paramName": "pguser", + "paramLongName": "postgresUser", + "paramDescription": "postgres user", + "paramRequired": false + }, + { + "paramName": "pgpasswd", + "paramLongName": "postgresPassword", + "paramDescription": "postgres password", + "paramRequired": false + } + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json index 4506e2ae1..39e0dd5ac 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json @@ -18,19 +18,19 @@ "paramRequired": true }, { - "paramName": "dburl", + "paramName": "pgurl", "paramLongName": "postgresUrl", "paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb", "paramRequired": true }, { - "paramName": "dbuser", + "paramName": "pguser", "paramLongName": "postgresUser", "paramDescription": "postgres user", "paramRequired": false }, { - "paramName": "dbpasswd", + "paramName": "pgpasswd", "paramLongName": "postgresPassword", "paramDescription": "postgres password", "paramRequired": false diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json index 5738daa76..fc900e97d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json @@ -24,7 +24,7 @@ "paramRequired": true }, { - "paramName": "db", + "paramName": "mongodb", "paramLongName": "mongoDb", "paramDescription": "mongo database", "paramRequired": true @@ -46,23 +46,5 @@ "paramLongName": "mdInterpretation", "paramDescription": "metadata interpretation", "paramRequired": true - }, - { - "paramName": "pgurl", - "paramLongName": "postgresUrl", - "paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb", - "paramRequired": true - }, - { - "paramName": "pguser", - "paramLongName": "postgresUser", - "paramDescription": "postgres user", - "paramRequired": false - }, - { - "paramName": "pgpasswd", - "paramLongName": "postgresPassword", - "paramDescription": "postgres password", - "paramRequired": false } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/claims/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/config-default.xml rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/claims/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/claims/oozie_app/workflow.xml similarity index 50% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/workflow.xml rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/claims/oozie_app/workflow.xml index b11cddfcf..7a1ee2cae 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/claims/oozie_app/workflow.xml @@ -1,14 +1,17 @@ - + - workingPath + migrationClaimsPathStep1 the base path to store hdfs file - graphRawPath + migrationClaimsPathStep2 + the temporary path to store entities before dispatching + + + migrationClaimsPathStep3 the graph Raw base path - postgresURL the postgres URL to access to the database @@ -22,7 +25,7 @@ the password postgres - mongourl + mongoURL mongoDB url, example: mongodb://[username:password@]host[:port] @@ -43,7 +46,7 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -51,143 +54,118 @@ - - + + - + - - + + ${jobTracker} ${nameNode} - eu.dnetlib.dhp.migration.MigrateDbEntitiesApplication - -p${workingPath}/db_entities + eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication + -p${migrationClaimsPathStep1}/db_claims -n${nameNode} -u${hdfsUser} - -dburl${postgresURL} - -dbuser${postgresUser} - -dbpasswd${postgresPassword} - - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.migration.MigrateDbEntitiesApplication - -p${workingPath}/db_claims - -n${nameNode} - -u${hdfsUser} - -dburl${postgresURL} - -dbuser${postgresUser} - -dbpasswd${postgresPassword} + -pgurl${postgresURL} + -pguser${postgresUser} + -pgpasswd${postgresPassword} -aclaims - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication - -p${workingPath}/odf_entities - -n${nameNode} - -u${hdfsUser} - -mongourl${mongourl} - -db${mongoDb} - -fODF - -lstore - -icleaned - -pgurl${postgresURL} - -pguser${postgresUser} - -pgpasswd${postgresPassword} - - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication - -p${workingPath}/oaf_entities - -n${nameNode} - -u${hdfsUser} - -mongourl${mongourl} - -db${mongoDb} - -fOAF - -lstore - -icleaned - -pgurl${postgresURL} - -pguser${postgresUser} - -pgpasswd${postgresPassword} - - + - + ${jobTracker} ${nameNode} - eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication - -p${workingPath}/odf_claims + eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication + -p${migrationClaimsPathStep1}/odf_claims -n${nameNode} -u${hdfsUser} - -mongourl${mongourl} - -db${mongoDb} + -mongourl${mongoURL} + -mongodb${mongoDb} -fODF -lstore -iclaim - -pgurl${postgresURL} - -pguser${postgresUser} - -pgpasswd${postgresPassword} - + - + ${jobTracker} ${nameNode} - eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication - -p${workingPath}/oaf_claims + eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication + -p${migrationClaimsPathStep1}/oaf_claims -n${nameNode} -u${hdfsUser} - -mongourl${mongourl} - -db${mongoDb} + -mongourl${mongoURL} + -mongodb${mongoDb} -fOAF -lstore -iclaim - -pgurl${postgresURL} - -pguser${postgresUser} - -pgpasswd${postgresPassword} - + - + + + + + + + + + + ${jobTracker} ${nameNode} yarn-cluster cluster - ExtractEntities - eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob + GenerateClaimEntities + eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication dhp-aggregation-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" -mt yarn-cluster - -s${workingPath}/db_entities,${workingPath}/oaf_entities,${workingPath}/odf_entities - -g${graphRawPath} + -s${migrationClaimsPathStep1}/db_claims,${migrationClaimsPathStep1}/oaf_claims,${migrationClaimsPathStep1}/odf_claims + -t${migrationClaimsPathStep2}/claim_entities + -pgurl${postgresURL} + -pguser${postgresUser} + -pgpasswd${postgresPassword} + + + + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + GenerateClaimGraph + eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + -s${migrationClaimsPathStep2}/claim_entities + -g${migrationClaimsPathStep3} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_all_steps/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_all_steps/oozie_app/config-default.xml new file mode 100644 index 000000000..51e48d8f7 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_all_steps/oozie_app/config-default.xml @@ -0,0 +1,22 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hdfsUser + dnet + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_all_steps/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_all_steps/oozie_app/workflow.xml new file mode 100644 index 000000000..e27372240 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_all_steps/oozie_app/workflow.xml @@ -0,0 +1,174 @@ + + + + migrationPathStep1 + the base path to store hdfs file + + + migrationPathStep2 + the temporary path to store entities before dispatching + + + migrationPathStep3 + the graph Raw base path + + + postgresURL + the postgres URL to access to the database + + + postgresUser + the user postgres + + + postgresPassword + the password postgres + + + mongoURL + mongoDB url, example: mongodb://[username:password@]host[:port] + + + mongoDb + mongo database + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication + -p${migrationPathStep1}/db_records + -n${nameNode} + -u${hdfsUser} + -pgurl${postgresURL} + -pguser${postgresUser} + -pgpasswd${postgresPassword} + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication + -p${migrationPathStep1}/odf_records + -n${nameNode} + -u${hdfsUser} + -mongourl${mongoURL} + -mongodb${mongoDb} + -fODF + -lstore + -icleaned + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication + -p${migrationPathStep1}/oaf_records + -n${nameNode} + -u${hdfsUser} + -mongourl${mongoURL} + -mongodb${mongoDb} + -fOAF + -lstore + -icleaned + + + + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + GenerateEntities + eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + -s${migrationPathStep1}/db_records,${migrationPathStep1}/oaf_records,${migrationPathStep1}/odf_records + -t${migrationPathStep2}/all_entities + -pgurl${postgresURL} + -pguser${postgresUser} + -pgpasswd${postgresPassword} + + + + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + GenerateGraph + eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + -s${migrationPathStep2}/all_entities + -g${migrationPathStep3} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1/oozie_app/config-default.xml new file mode 100644 index 000000000..51e48d8f7 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1/oozie_app/config-default.xml @@ -0,0 +1,22 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hdfsUser + dnet + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1/oozie_app/workflow.xml new file mode 100644 index 000000000..7b3c5a746 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1/oozie_app/workflow.xml @@ -0,0 +1,109 @@ + + + + migrationPathStep1 + the base path to store hdfs file + + + postgresURL + the postgres URL to access to the database + + + postgresUser + the user postgres + + + postgresPassword + the password postgres + + + mongoURL + mongoDB url, example: mongodb://[username:password@]host[:port] + + + mongoDb + mongo database + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication + -p${migrationPathStep1}/db_records + -n${nameNode} + -u${hdfsUser} + -pgurl${postgresURL} + -pguser${postgresUser} + -pgpasswd${postgresPassword} + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication + -p${migrationPathStep1}/odf_records + -n${nameNode} + -u${hdfsUser} + -mongourl${mongoURL} + -mongodb${mongoDb} + -fODF + -lstore + -icleaned + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication + -p${migrationPathStep1}/oaf_records + -n${nameNode} + -u${hdfsUser} + -mongourl${mongoURL} + -mongodb${mongoDb} + -fOAF + -lstore + -icleaned + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step2/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step2/oozie_app/config-default.xml new file mode 100644 index 000000000..51e48d8f7 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step2/oozie_app/config-default.xml @@ -0,0 +1,22 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hdfsUser + dnet + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step2/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step2/oozie_app/workflow.xml new file mode 100644 index 000000000..cd0a4025e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step2/oozie_app/workflow.xml @@ -0,0 +1,74 @@ + + + + migrationPathStep1 + the base path to store hdfs file + + + migrationPathStep2 + the temporary path to store entities before dispatching + + + postgresURL + the postgres URL to access to the database + + + postgresUser + the user postgres + + + postgresPassword + the password postgres + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + GenerateEntities + eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + -s${migrationPathStep1}/db_records,${migrationPathStep1}/oaf_records,${migrationPathStep1}/odf_records + -t${migrationPathStep2}/all_entities + -pgurl${postgresURL} + -pguser${postgresUser} + -pgpasswd${postgresPassword} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step3/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step3/oozie_app/config-default.xml new file mode 100644 index 000000000..51e48d8f7 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step3/oozie_app/config-default.xml @@ -0,0 +1,22 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hdfsUser + dnet + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step3/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step3/oozie_app/workflow.xml new file mode 100644 index 000000000..8688f09d1 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step3/oozie_app/workflow.xml @@ -0,0 +1,60 @@ + + + + + migrationPathStep2 + the temporary path to store entities before dispatching + + + migrationPathStep3 + the graph Raw base path + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + GenerateGraph + eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + -s${migrationPathStep2}/all_entities + -g${migrationPathStep3} + + + + + + + \ No newline at end of file