From 4b29a121b0e263cd9f1909916099b17744b63e88 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 2 Mar 2020 16:12:14 +0100 Subject: [PATCH] migration using spark in step2 --- .../step1/MigrateDbEntitiesApplication.java | 16 +- .../MigrateMongoMdstoresApplication.java | 61 +++-- .../step2/AbstractMdRecordToOafMapper.java | 123 ++++------ .../step2/GenerateEntitiesApplication.java | 170 ++++++++++++++ .../dhp/migration/step2/OafToOafMapper.java | 25 +-- .../dhp/migration/step2/OdfToOafMapper.java | 78 +++---- .../step3/DispatchEntitiesApplication.java | 72 ++---- .../utils/AbstractMigrationApplication.java | 212 ++---------------- .../dnetlib/dhp/migration/utils/DbClient.java | 2 +- .../dhp/migration/utils/MdstoreClient.java | 2 +- .../dhp/migration/utils/OafMapperUtils.java | 191 ++++++++++++++++ .../dhp/migration/utils/PacePerson.java | 2 +- .../dispatch_entities_parameters.json | 4 +- .../generate_entities_parameters.json | 39 ++++ .../migrate_db_entities_parameters.json | 6 +- .../migrate_mongo_mstores_parameters.json | 20 +- .../wfs/claims/oozie_app/config-default.xml | 22 ++ .../wfs/claims/oozie_app/workflow.xml | 175 +++++++++++++++ .../regular_all_steps/oozie_app/workflow.xml | 170 ++++++-------- .../oozie_app/config-default.xml | 22 ++ .../wfs/regular_step1/oozie_app/workflow.xml | 109 +++++++++ .../oozie_app/config-default.xml | 22 ++ .../wfs/regular_step2/oozie_app/workflow.xml | 74 ++++++ .../oozie_app/config-default.xml | 22 ++ .../wfs/regular_step3/oozie_app/workflow.xml | 60 +++++ 25 files changed, 1166 insertions(+), 533 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/OafMapperUtils.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/generate_entities_parameters.json create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/claims/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/claims/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step2/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step2/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step3/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step3/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateDbEntitiesApplication.java index 1ccfd09ef0..b6ebc6b988 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateDbEntitiesApplication.java @@ -1,4 +1,14 @@ -package eu.dnetlib.dhp.migration; +package eu.dnetlib.dhp.migration.step1; + +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.asString; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.dataInfo; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.journal; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.listFields; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.listKeyValues; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.qualifier; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.structuredProperty; import java.io.Closeable; import java.io.IOException; @@ -17,6 +27,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.migration.utils.AbstractMigrationApplication; +import eu.dnetlib.dhp.migration.utils.DbClient; import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Dataset; @@ -34,7 +46,7 @@ import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor implements Closeable { +public class MigrateDbEntitiesApplication extends AbstractMigrationApplication implements Closeable { private static final Qualifier ENTITYREGISTRY_PROVENANCE_ACTION = qualifier("sysimport:crosswalk:entityregistry", "sysimport:crosswalk:entityregistry", "dnet:provenance_actions", "dnet:provenance_actions"); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateMongoMdstoresApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateMongoMdstoresApplication.java index 359fe7596b..dad1278e96 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateMongoMdstoresApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step1/MigrateMongoMdstoresApplication.java @@ -1,10 +1,23 @@ -package eu.dnetlib.dhp.migration; +package eu.dnetlib.dhp.migration.step1; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; +import java.util.Map.Entry; import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.migration.utils.AbstractMigrationApplication; +import eu.dnetlib.dhp.migration.utils.MdstoreClient; -public class MigrateMongoMdstoresApplication { +public class MigrateMongoMdstoresApplication extends AbstractMigrationApplication implements Closeable { + + private static final Log log = LogFactory.getLog(MigrateMongoMdstoresApplication.class); + + private final MdstoreClient mdstoreClient; public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -22,24 +35,36 @@ public class MigrateMongoMdstoresApplication { final String hdfsNameNode = parser.get("namenode"); final String hdfsUser = parser.get("hdfsUser"); - final String dbUrl = parser.get("postgresUrl"); - final String dbUser = parser.get("postgresUser"); - final String dbPassword = parser.get("postgresPassword"); - - if (mdFormat.equalsIgnoreCase("oaf")) { - try (final OafMigrationExecutor mig = - new OafMigrationExecutor(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword)) { - mig.processMdRecords(mdFormat, mdLayout, mdInterpretation); - } - } else if (mdFormat.equalsIgnoreCase("odf")) { - try (final OdfMigrationExecutor mig = - new OdfMigrationExecutor(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword)) { - mig.processMdRecords(mdFormat, mdLayout, mdInterpretation); - } - } else { - throw new RuntimeException("Format not supported: " + mdFormat); + try (MigrateMongoMdstoresApplication app = new MigrateMongoMdstoresApplication(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb)) { + app.execute(mdFormat, mdLayout, mdInterpretation); } } + public MigrateMongoMdstoresApplication(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String mongoBaseUrl, + final String mongoDb) throws Exception { + super(hdfsPath, hdfsNameNode, hdfsUser); + this.mdstoreClient = new MdstoreClient(mongoBaseUrl, mongoDb); + } + + public void execute(final String format, final String layout, final String interpretation) { + final Map colls = mdstoreClient.validCollections(format, layout, interpretation); + log.info("Found " + colls.size() + " mdstores"); + + for (final Entry entry : colls.entrySet()) { + log.info("Processing mdstore " + entry.getKey() + " (collection: " + entry.getValue() + ")"); + final String currentColl = entry.getValue(); + + for (final String xml : mdstoreClient.listRecords(currentColl)) { + emit(xml, "native_" + format); + } + } + } + + @Override + public void close() throws IOException { + super.close(); + mdstoreClient.close(); + } + } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/AbstractMdRecordToOafMapper.java index 0595726d45..245d53df08 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/AbstractMdRecordToOafMapper.java @@ -1,20 +1,24 @@ -package eu.dnetlib.dhp.migration; +package eu.dnetlib.dhp.migration.step2; + +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.dataInfo; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.journal; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.keyValue; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.listFields; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.oaiIProvenance; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.qualifier; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.structuredProperty; -import java.io.IOException; -import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.dom4j.Document; -import org.dom4j.DocumentException; import org.dom4j.DocumentFactory; import org.dom4j.DocumentHelper; import org.dom4j.Node; @@ -37,11 +41,9 @@ import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -public abstract class AbstractMongoExecutor extends AbstractMigrationExecutor { +public abstract class AbstractMdRecordToOafMapper { - protected final Map code2name = new HashMap<>(); - - protected final MdstoreClient mdstoreClient; + protected final Map code2name; protected static final Qualifier MAIN_TITLE_QUALIFIER = qualifier("main title", "main title", "dnet:dataCite_title", "dnet:dataCite_title"); @@ -51,79 +53,36 @@ public abstract class AbstractMongoExecutor extends AbstractMigrationExecutor { protected static final Qualifier SOFTWARE_RESULTTYPE_QUALIFIER = qualifier("software", "software", "dnet:result_typologies", "dnet:result_typologies"); protected static final Qualifier OTHER_RESULTTYPE_QUALIFIER = qualifier("other", "other", "dnet:result_typologies", "dnet:result_typologies"); - private static final Log log = LogFactory.getLog(AbstractMongoExecutor.class); - - public AbstractMongoExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String mongoBaseUrl, - final String mongoDb, final String dbUrl, final String dbUser, - final String dbPassword) throws Exception { - - super(hdfsPath, hdfsNameNode, hdfsUser); - - this.mdstoreClient = new MdstoreClient(mongoBaseUrl, mongoDb); - loadClassNames(dbUrl, dbUser, dbPassword); - - final Map nsContext = new HashMap<>(); - - registerNamespaces(nsContext); - - DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext); + protected AbstractMdRecordToOafMapper(final Map code2name) { + this.code2name = code2name; } - private void loadClassNames(final String dbUrl, final String dbUser, final String dbPassword) throws IOException { + public List processMdRecord(final String xml) { + try { + final Map nsContext = new HashMap<>(); + nsContext.put("dr", "http://www.driver-repository.eu/namespace/dr"); + nsContext.put("dri", "http://www.driver-repository.eu/namespace/dri"); + nsContext.put("oaf", "http://namespace.openaire.eu/oaf"); + nsContext.put("oai", "http://www.openarchives.org/OAI/2.0/"); + nsContext.put("prov", "http://www.openarchives.org/OAI/2.0/provenance"); + nsContext.put("dc", "http://purl.org/dc/elements/1.1/"); + nsContext.put("datacite", "http://datacite.org/schema/kernel-3"); + DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext); - log.info("Loading vocabulary terms from db..."); + final Document doc = DocumentHelper.parseText(xml); - try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) { - code2name.clear(); - dbClient.processResults("select code, name from class", rs -> { - try { - code2name.put(rs.getString("code"), rs.getString("name")); - } catch (final SQLException e) { - e.printStackTrace(); - } - }); + final String type = doc.valueOf("//dr:CobjCategory/@type"); + final KeyValue collectedFrom = keyValue(doc.valueOf("//oaf:collectedFrom/@id"), doc.valueOf("//oaf:collectedFrom/@name")); + final KeyValue hostedBy = StringUtils.isBlank(doc.valueOf("//oaf:hostedBy/@id")) ? collectedFrom + : keyValue(doc.valueOf("//oaf:hostedBy/@id"), doc.valueOf("//oaf:hostedBy/@name")); + + final DataInfo info = prepareDataInfo(doc); + final long lastUpdateTimestamp = new Date().getTime(); + + return createOafs(doc, type, collectedFrom, hostedBy, info, lastUpdateTimestamp); + } catch (final Exception e) { + throw new RuntimeException(e); } - - log.info("Found " + code2name.size() + " terms."); - - } - - public void processMdRecords(final String mdFormat, final String mdLayout, final String mdInterpretation) throws DocumentException { - - log.info(String.format("Searching mdstores (format: %s, layout: %s, interpretation: %s)", mdFormat, mdLayout, mdInterpretation)); - - final Map colls = mdstoreClient.validCollections(mdFormat, mdLayout, mdInterpretation); - log.info("Found " + colls.size() + " mdstores"); - - for (final Entry entry : colls.entrySet()) { - log.info("Processing mdstore " + entry.getKey() + " (collection: " + entry.getValue() + ")"); - final String currentColl = entry.getValue(); - - for (final String xml : mdstoreClient.listRecords(currentColl)) { - final Document doc = DocumentHelper.parseText(xml); - - final String type = doc.valueOf("//dr:CobjCategory/@type"); - final KeyValue collectedFrom = keyValue(doc.valueOf("//oaf:collectedFrom/@id"), doc.valueOf("//oaf:collectedFrom/@name")); - final KeyValue hostedBy = StringUtils.isBlank(doc.valueOf("//oaf:hostedBy/@id")) ? collectedFrom - : keyValue(doc.valueOf("//oaf:hostedBy/@id"), doc.valueOf("//oaf:hostedBy/@name")); - - final DataInfo info = prepareDataInfo(doc); - final long lastUpdateTimestamp = new Date().getTime(); - - for (final Oaf oaf : createOafs(doc, type, collectedFrom, hostedBy, info, lastUpdateTimestamp)) { - emitOaf(oaf); - } - } - } - log.info("All Done."); - } - - protected void registerNamespaces(final Map nsContext) { - nsContext.put("dr", "http://www.driver-repository.eu/namespace/dr"); - nsContext.put("dri", "http://www.driver-repository.eu/namespace/dri"); - nsContext.put("oaf", "http://namespace.openaire.eu/oaf"); - nsContext.put("oai", "http://www.openarchives.org/OAI/2.0/"); - nsContext.put("prov", "http://www.openarchives.org/OAI/2.0/provenance"); } protected List createOafs(final Document doc, @@ -432,10 +391,4 @@ public abstract class AbstractMongoExecutor extends AbstractMigrationExecutor { return res; } - @Override - public void close() throws IOException { - super.close(); - mdstoreClient.close(); - } - } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java new file mode 100644 index 0000000000..5627a720b0 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java @@ -0,0 +1,170 @@ +package eu.dnetlib.dhp.migration.step2; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication; +import eu.dnetlib.dhp.migration.utils.DbClient; +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Datasource; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Software; +import scala.Tuple2; + +public class GenerateEntitiesApplication { + + private static final Log log = LogFactory.getLog(GenerateEntitiesApplication.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString(MigrateMongoMdstoresApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/migration/generate_entities_parameters.json"))); + + parser.parseArgument(args); + + final String sourcePaths = parser.get("sourcePaths"); + final String targetPath = parser.get("targetPath"); + + final String dbUrl = parser.get("postgresUrl"); + final String dbUser = parser.get("postgresUser"); + final String dbPassword = parser.get("postgresPassword"); + + final SparkSession spark = SparkSession + .builder() + .appName(GenerateEntitiesApplication.class.getSimpleName()) + .master(parser.get("master")) + .getOrCreate(); + + final Map code2name = loadClassNames(dbUrl, dbUser, dbPassword); + + try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) { + final List existingSourcePaths = Arrays.stream(sourcePaths.split(",")).filter(p -> exists(sc, p)).collect(Collectors.toList()); + generateEntities(sc, code2name, existingSourcePaths, targetPath); + } + } + + private static void generateEntities(final JavaSparkContext sc, + final Map code2name, + final List sourcePaths, + final String targetPath) { + + log.info("Generate entities from files:"); + sourcePaths.forEach(log::info); + + JavaRDD inputRdd = sc.emptyRDD(); + + for (final String sp : sourcePaths) { + inputRdd = inputRdd.union(sc.sequenceFile(sp, Text.class, Text.class) + .map(k -> new Tuple2<>(k._1().toString(), k._2().toString())) + .map(k -> convertToListOaf(k._1(), k._2(), code2name)) + .flatMap(list -> list.iterator()) + .map(oaf -> oaf.getClass().getSimpleName().toLowerCase() + "|" + convertToJson(oaf))); + } + + inputRdd.saveAsTextFile(targetPath); + + } + + private static List convertToListOaf(final String id, final String s, final Map code2name) { + final String type = StringUtils.substringAfter(id, ":"); + + switch (type.toLowerCase()) { + case "native_oaf": + return new OafToOafMapper(code2name).processMdRecord(s); + case "native_odf": + return new OafToOafMapper(code2name).processMdRecord(s); + case "datasource": + return Arrays.asList(convertFromJson(s, Datasource.class)); + case "organization": + return Arrays.asList(convertFromJson(s, Organization.class)); + case "project": + return Arrays.asList(convertFromJson(s, Project.class)); + case "relation": + return Arrays.asList(convertFromJson(s, Relation.class)); + case "publication": + return Arrays.asList(convertFromJson(s, Publication.class)); + case "dataset": + return Arrays.asList(convertFromJson(s, Dataset.class)); + case "software": + return Arrays.asList(convertFromJson(s, Software.class)); + case "otherresearchproducts": + default: + return Arrays.asList(convertFromJson(s, OtherResearchProduct.class)); + } + + } + + private static Map loadClassNames(final String dbUrl, final String dbUser, final String dbPassword) throws IOException { + + log.info("Loading vocabulary terms from db..."); + + final Map map = new HashMap<>(); + + try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) { + dbClient.processResults("select code, name from class", rs -> { + try { + map.put(rs.getString("code"), rs.getString("name")); + } catch (final SQLException e) { + e.printStackTrace(); + } + }); + } + + log.info("Found " + map.size() + " terms."); + + return map; + + } + + private static String convertToJson(final Oaf oaf) { + try { + return new ObjectMapper().writeValueAsString(oaf); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + private static Oaf convertFromJson(final String s, final Class clazz) { + try { + return new ObjectMapper().readValue(s, clazz); + } catch (final Exception e) { + log.error("Error parsing object of class: " + clazz); + log.error(s); + throw new RuntimeException(e); + } + } + + private static boolean exists(final JavaSparkContext context, final String pathToFile) { + try { + final FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(context.hadoopConfiguration()); + final Path path = new Path(pathToFile); + return hdfs.exists(path); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OafToOafMapper.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OafToOafMapper.java index c325682905..0ca13941d1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OafToOafMapper.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OafToOafMapper.java @@ -1,16 +1,17 @@ -package eu.dnetlib.dhp.migration; +package eu.dnetlib.dhp.migration.step2; + +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.dom4j.Document; import org.dom4j.Node; -import eu.dnetlib.dhp.migration.pace.PacePerson; +import eu.dnetlib.dhp.migration.utils.PacePerson; import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Field; @@ -22,20 +23,10 @@ import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -public class OafMigrationExecutor extends AbstractMongoExecutor { +public class OafToOafMapper extends AbstractMdRecordToOafMapper { - private static final Log log = LogFactory.getLog(OafMigrationExecutor.class); - - public OafMigrationExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String mongoBaseUrl, final String mongoDb, - final String dbUrl, final String dbUser, - final String dbPassword) throws Exception { - super(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword); - } - - @Override - protected void registerNamespaces(final Map nsContext) { - super.registerNamespaces(nsContext); - nsContext.put("dc", "http://purl.org/dc/elements/1.1/"); + public OafToOafMapper(final Map code2name) { + super(code2name); } @Override diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OdfToOafMapper.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OdfToOafMapper.java index 457534085a..01c2d4b076 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OdfToOafMapper.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/OdfToOafMapper.java @@ -1,4 +1,8 @@ -package eu.dnetlib.dhp.migration; +package eu.dnetlib.dhp.migration.step2; + +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.createOpenaireId; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.field; +import static eu.dnetlib.dhp.migration.utils.OafMapperUtils.structuredProperty; import java.util.ArrayList; import java.util.Arrays; @@ -6,8 +10,6 @@ import java.util.List; import java.util.Map; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.dom4j.Document; import org.dom4j.Node; @@ -22,38 +24,28 @@ import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -public class OdfMigrationExecutor extends AbstractMongoExecutor { +public class OdfToOafMapper extends AbstractMdRecordToOafMapper { - private static final Log log = LogFactory.getLog(OdfMigrationExecutor.class); - - public OdfMigrationExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser, final String mongoBaseUrl, final String mongoDb, - final String dbUrl, final String dbUser, - final String dbPassword) throws Exception { - super(hdfsPath, hdfsNameNode, hdfsUser, mongoBaseUrl, mongoDb, dbUrl, dbUser, dbPassword); - } - - @Override - protected void registerNamespaces(final Map nsContext) { - super.registerNamespaces(nsContext); - nsContext.put("dc", "http://datacite.org/schema/kernel-3"); + public OdfToOafMapper(final Map code2name) { + super(code2name); } @Override protected List prepareTitles(final Document doc, final DataInfo info) { - return prepareListStructProps(doc, "//dc:title", MAIN_TITLE_QUALIFIER, info); + return prepareListStructProps(doc, "//datacite:title", MAIN_TITLE_QUALIFIER, info); } @Override protected List prepareAuthors(final Document doc, final DataInfo info) { final List res = new ArrayList<>(); int pos = 1; - for (final Object o : doc.selectNodes("//dc:creator")) { + for (final Object o : doc.selectNodes("//datacite:creator")) { final Node n = (Node) o; final Author author = new Author(); - author.setFullname(n.valueOf("./dc:creatorName")); - author.setName(n.valueOf("./dc:givenName")); - author.setSurname(n.valueOf("./dc:familyName")); - author.setAffiliation(prepareListFields(doc, "./dc:affiliation", info)); + author.setFullname(n.valueOf("./datacite:creatorName")); + author.setName(n.valueOf("./datacite:givenName")); + author.setSurname(n.valueOf("./datacite:familyName")); + author.setAffiliation(prepareListFields(doc, "./datacite:affiliation", info)); author.setPid(preparePids(doc, info)); author.setRank(pos++); res.add(author); @@ -63,7 +55,7 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor { private List preparePids(final Document doc, final DataInfo info) { final List res = new ArrayList<>(); - for (final Object o : doc.selectNodes("./dc:nameIdentifier")) { + for (final Object o : doc.selectNodes("./datacite:nameIdentifier")) { res.add(structuredProperty(((Node) o).getText(), prepareQualifier((Node) o, "./@nameIdentifierScheme", "dnet:pid_types", "dnet:pid_types"), info)); } return res; @@ -72,7 +64,7 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor { @Override protected List prepareInstances(final Document doc, final DataInfo info, final KeyValue collectedfrom, final KeyValue hostedby) { final List res = new ArrayList<>(); - for (final Object o : doc.selectNodes("//dc:alternateIdentifier[@alternateIdentifierType='URL']")) { + for (final Object o : doc.selectNodes("//datacite:alternateIdentifier[@alternateIdentifierType='URL']")) { final Instance instance = new Instance(); instance.setUrl(Arrays.asList(((Node) o).getText().trim())); instance.setInstancetype(prepareQualifier(doc, "//dr:CobjCategory", "dnet:publication_resource", "dnet:publication_resource")); @@ -98,7 +90,7 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor { @Override protected List prepareRelevantDates(final Document doc, final DataInfo info) { final List res = new ArrayList<>(); - for (final Object o : doc.selectNodes("//dc:date")) { + for (final Object o : doc.selectNodes("//datacite:date")) { final String dateType = ((Node) o).valueOf("@dateType"); if (StringUtils.isBlank(dateType) && !dateType.equalsIgnoreCase("Accepted") && !dateType.equalsIgnoreCase("Issued") && !dateType.equalsIgnoreCase("Updated") && !dateType.equalsIgnoreCase("Available")) { @@ -115,32 +107,32 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor { @Override protected List> prepareContributors(final Document doc, final DataInfo info) { - return prepareListFields(doc, "//dc:contributorName", info); + return prepareListFields(doc, "//datacite:contributorName", info); } @Override protected List> prepareFormats(final Document doc, final DataInfo info) { - return prepareListFields(doc, "//dc:format", info); + return prepareListFields(doc, "//datacite:format", info); } @Override protected Field preparePublisher(final Document doc, final DataInfo info) { - return prepareField(doc, "//dc:publisher", info); + return prepareField(doc, "//datacite:publisher", info); } @Override protected List> prepareDescriptions(final Document doc, final DataInfo info) { - return prepareListFields(doc, "//dc:description[@descriptionType='Abstract']", info); + return prepareListFields(doc, "//datacite:description[@descriptionType='Abstract']", info); } @Override protected List prepareSubjects(final Document doc, final DataInfo info) { - return prepareListStructProps(doc, "//dc:subject", info); + return prepareListStructProps(doc, "//datacite:subject", info); } @Override protected Qualifier prepareLanguages(final Document doc) { - return prepareQualifier(doc, "//dc:language", "dnet:languages", "dnet:languages"); + return prepareQualifier(doc, "//datacite:language", "dnet:languages", "dnet:languages"); } @Override @@ -150,17 +142,17 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor { @Override protected List> prepareOtherResearchProductContactGroups(final Document doc, final DataInfo info) { - return prepareListFields(doc, "//dc:contributor[@contributorType='ContactGroup']/dc:contributorName", info); + return prepareListFields(doc, "//datacite:contributor[@contributorType='ContactGroup']/datacite:contributorName", info); } @Override protected List> prepareOtherResearchProductContactPersons(final Document doc, final DataInfo info) { - return prepareListFields(doc, "//dc:contributor[@contributorType='ContactPerson']/dc:contributorName", info); + return prepareListFields(doc, "//datacite:contributor[@contributorType='ContactPerson']/datacite:contributorName", info); } @Override protected Qualifier prepareSoftwareProgrammingLanguage(final Document doc, final DataInfo info) { - return prepareQualifier(doc, "//dc:format", "dnet:programming_languages", "dnet:programming_languages"); + return prepareQualifier(doc, "//datacite:format", "dnet:programming_languages", "dnet:programming_languages"); } @Override @@ -175,7 +167,7 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor { @Override protected List> prepareSoftwareDocumentationUrls(final Document doc, final DataInfo info) { - return prepareListFields(doc, "//dc:relatedIdentifier[@relatedIdentifierType='URL' and @relationType='IsDocumentedBy']", info); + return prepareListFields(doc, "//datacite:relatedIdentifier[@relatedIdentifierType='URL' and @relationType='IsDocumentedBy']", info); } // DATASETS @@ -184,11 +176,11 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor { protected List prepareDatasetGeoLocations(final Document doc, final DataInfo info) { final List res = new ArrayList<>(); - for (final Object o : doc.selectNodes("//dc:geoLocation")) { + for (final Object o : doc.selectNodes("//datacite:geoLocation")) { final GeoLocation loc = new GeoLocation(); - loc.setBox(((Node) o).valueOf("./dc:geoLocationBox")); - loc.setPlace(((Node) o).valueOf("./dc:geoLocationPlace")); - loc.setPoint(((Node) o).valueOf("./dc:geoLocationPoint")); + loc.setBox(((Node) o).valueOf("./datacite:geoLocationBox")); + loc.setPlace(((Node) o).valueOf("./datacite:geoLocationPlace")); + loc.setPoint(((Node) o).valueOf("./datacite:geoLocationPoint")); res.add(loc); } return res; @@ -201,17 +193,17 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor { @Override protected Field prepareDatasetLastMetadataUpdate(final Document doc, final DataInfo info) { - return prepareField(doc, "//dc:date[@dateType='Updated']", info); + return prepareField(doc, "//datacite:date[@dateType='Updated']", info); } @Override protected Field prepareDatasetVersion(final Document doc, final DataInfo info) { - return prepareField(doc, "//dc:version", info); + return prepareField(doc, "//datacite:version", info); } @Override protected Field prepareDatasetSize(final Document doc, final DataInfo info) { - return prepareField(doc, "//dc:size", info); + return prepareField(doc, "//datacite:size", info); } @Override @@ -221,7 +213,7 @@ public class OdfMigrationExecutor extends AbstractMongoExecutor { @Override protected Field prepareDatasetStorageDate(final Document doc, final DataInfo info) { - return prepareField(doc, "//dc:date[@dateType='Issued']", info); + return prepareField(doc, "//datacite:date[@dateType='Issued']", info); } @Override diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java index 3b6fc9b5de..222828a494 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java @@ -1,22 +1,14 @@ -package eu.dnetlib.dhp.migration; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; +package eu.dnetlib.dhp.migration.step3; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication; import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Organization; @@ -25,70 +17,52 @@ import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Software; -import scala.Tuple2; -public class ExtractEntitiesFromHDFSJob { +public class DispatchEntitiesApplication { - private static final Log log = LogFactory.getLog(ExtractEntitiesFromHDFSJob.class); + private static final Log log = LogFactory.getLog(DispatchEntitiesApplication.class); public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils.toString(MigrateMongoMdstoresApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json"))); parser.parseArgument(args); final SparkSession spark = SparkSession .builder() - .appName(ExtractEntitiesFromHDFSJob.class.getSimpleName()) + .appName(DispatchEntitiesApplication.class.getSimpleName()) .master(parser.get("master")) .getOrCreate(); try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) { - final List sourcePaths = Arrays.stream(parser.get("sourcePaths").split(",")).filter(p -> exists(sc, p)).collect(Collectors.toList()); + final String sourcePath = parser.get("sourcePath"); final String targetPath = parser.get("graphRawPath"); - processEntity(sc, Publication.class, sourcePaths, targetPath); - processEntity(sc, Dataset.class, sourcePaths, targetPath); - processEntity(sc, Software.class, sourcePaths, targetPath); - processEntity(sc, OtherResearchProduct.class, sourcePaths, targetPath); - processEntity(sc, Datasource.class, sourcePaths, targetPath); - processEntity(sc, Organization.class, sourcePaths, targetPath); - processEntity(sc, Project.class, sourcePaths, targetPath); - processEntity(sc, Relation.class, sourcePaths, targetPath); + processEntity(sc, Publication.class, sourcePath, targetPath); + processEntity(sc, Dataset.class, sourcePath, targetPath); + processEntity(sc, Software.class, sourcePath, targetPath); + processEntity(sc, OtherResearchProduct.class, sourcePath, targetPath); + processEntity(sc, Datasource.class, sourcePath, targetPath); + processEntity(sc, Organization.class, sourcePath, targetPath); + processEntity(sc, Project.class, sourcePath, targetPath); + processEntity(sc, Relation.class, sourcePath, targetPath); } } - private static void processEntity(final JavaSparkContext sc, final Class clazz, final List sourcePaths, final String targetPath) { + private static void processEntity(final JavaSparkContext sc, final Class clazz, final String sourcePath, final String targetPath) { final String type = clazz.getSimpleName().toLowerCase(); - log.info(String.format("Processing entities (%s) in files:", type)); - sourcePaths.forEach(log::info); - - JavaRDD inputRdd = sc.emptyRDD(); - - for (final String sp : sourcePaths) { - inputRdd = inputRdd.union(sc.sequenceFile(sp, Text.class, Text.class) - .map(k -> new Tuple2<>(k._1().toString(), k._2().toString())) - .filter(k -> isEntityType(k._1(), type)) - .map(Tuple2::_2)); - } - - inputRdd.saveAsTextFile(targetPath + "/" + type); + log.info(String.format("Processing entities (%s) in file: %s", type, sourcePath)); + sc.textFile(sourcePath) + .filter(l -> isEntityType(l, type)) + .map(l -> StringUtils.substringAfter(l, "|")) + .saveAsTextFile(targetPath + "/" + type); // use repartition(XXX) ??? } - private static boolean isEntityType(final String item, final String type) { - return StringUtils.substringAfter(item, ":").equalsIgnoreCase(type); + private static boolean isEntityType(final String line, final String type) { + return StringUtils.substringBefore(line, "|").equalsIgnoreCase(type); } - private static boolean exists(final JavaSparkContext context, final String pathToFile) { - try { - final FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(context.hadoopConfiguration()); - final Path path = new Path(pathToFile); - return hdfs.exists(path); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/AbstractMigrationApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/AbstractMigrationApplication.java index b0db3c76fd..41f9f8145c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/AbstractMigrationApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/AbstractMigrationApplication.java @@ -1,16 +1,10 @@ -package eu.dnetlib.dhp.migration; +package eu.dnetlib.dhp.migration.utils; import java.io.Closeable; import java.io.IOException; import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -20,19 +14,9 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.codehaus.jackson.map.ObjectMapper; -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.ExtraInfo; -import eu.dnetlib.dhp.schema.oaf.Field; -import eu.dnetlib.dhp.schema.oaf.Journal; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.OAIProvenance; import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.OriginDescription; -import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -import eu.dnetlib.dhp.utils.DHPUtils; -public class AbstractMigrationExecutor implements Closeable { +public class AbstractMigrationApplication implements Closeable { private final AtomicInteger counter = new AtomicInteger(0); @@ -40,13 +24,13 @@ public class AbstractMigrationExecutor implements Closeable { private final Text value = new Text(); - private final ObjectMapper objectMapper = new ObjectMapper(); - private final SequenceFile.Writer writer; - private static final Log log = LogFactory.getLog(AbstractMigrationExecutor.class); + private final ObjectMapper objectMapper = new ObjectMapper(); - public AbstractMigrationExecutor(final String hdfsPath, final String hdfsNameNode, final String hdfsUser) throws Exception { + private static final Log log = LogFactory.getLog(AbstractMigrationApplication.class); + + public AbstractMigrationApplication(final String hdfsPath, final String hdfsNameNode, final String hdfsUser) throws Exception { log.info(String.format("Creating SequenceFile Writer, hdfsPath=%s, nameNode=%s, user=%s", hdfsPath, hdfsNameNode, hdfsUser)); @@ -65,186 +49,32 @@ public class AbstractMigrationExecutor implements Closeable { return conf; } - protected void emitOaf(final Oaf oaf) { + protected void emit(final String s, final String type) { try { - key.set(counter.getAndIncrement() + ":" + oaf.getClass().getSimpleName().toLowerCase()); - value.set(objectMapper.writeValueAsString(oaf)); + key.set(counter.getAndIncrement() + ":" + type); + value.set(s); writer.append(key, value); } catch (final Exception e) { throw new RuntimeException(e); } } + protected void emitOaf(final Oaf oaf) { + try { + emit(objectMapper.writeValueAsString(oaf), oaf.getClass().getSimpleName().toLowerCase()); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + public ObjectMapper getObjectMapper() { + return objectMapper; + } + @Override public void close() throws IOException { writer.hflush(); writer.close(); } - public static KeyValue keyValue(final String k, final String v) { - final KeyValue kv = new KeyValue(); - kv.setKey(k); - kv.setValue(v); - return kv; - } - - public static List listKeyValues(final String... s) { - if (s.length % 2 > 0) { throw new RuntimeException("Invalid number of parameters (k,v,k,v,....)"); } - - final List list = new ArrayList<>(); - for (int i = 0; i < s.length; i += 2) { - list.add(keyValue(s[i], s[i + 1])); - } - return list; - } - - public static Field field(final T value, final DataInfo info) { - if (value == null || StringUtils.isBlank(value.toString())) { return null; } - - final Field field = new Field<>(); - field.setValue(value); - field.setDataInfo(info); - return field; - } - - public static List> listFields(final DataInfo info, final String... values) { - return Arrays.stream(values).map(v -> field(v, info)).filter(Objects::nonNull).collect(Collectors.toList()); - } - - public static List> listFields(final DataInfo info, final List values) { - return values.stream().map(v -> field(v, info)).filter(Objects::nonNull).collect(Collectors.toList()); - } - - public static Qualifier qualifier(final String classid, final String classname, final String schemeid, final String schemename) { - final Qualifier q = new Qualifier(); - q.setClassid(classid); - q.setClassname(classname); - q.setSchemeid(schemeid); - q.setSchemename(schemename); - return q; - } - - public static StructuredProperty structuredProperty(final String value, - final String classid, - final String classname, - final String schemeid, - final String schemename, - final DataInfo dataInfo) { - - return structuredProperty(value, qualifier(classid, classname, schemeid, schemename), dataInfo); - } - - public static StructuredProperty structuredProperty(final String value, final Qualifier qualifier, final DataInfo dataInfo) { - if (value == null) { return null; } - final StructuredProperty sp = new StructuredProperty(); - sp.setValue(value); - sp.setQualifier(qualifier); - sp.setDataInfo(dataInfo); - return sp; - } - - public static ExtraInfo extraInfo(final String name, final String value, final String typology, final String provenance, final String trust) { - final ExtraInfo info = new ExtraInfo(); - info.setName(name); - info.setValue(value); - info.setTypology(typology); - info.setProvenance(provenance); - info.setTrust(trust); - return info; - } - - public static OAIProvenance oaiIProvenance(final String identifier, - final String baseURL, - final String metadataNamespace, - final Boolean altered, - final String datestamp, - final String harvestDate) { - - final OriginDescription desc = new OriginDescription(); - desc.setIdentifier(identifier); - desc.setBaseURL(baseURL); - desc.setMetadataNamespace(metadataNamespace); - desc.setAltered(altered); - desc.setDatestamp(datestamp); - desc.setHarvestDate(harvestDate); - - final OAIProvenance p = new OAIProvenance(); - p.setOriginDescription(desc); - - return p; - } - - public static Journal journal(final String name, - final String issnPrinted, - final String issnOnline, - final String issnLinking, - final String ep, - final String iss, - final String sp, - final String vol, - final String edition, - final String conferenceplace, - final String conferencedate, - final DataInfo dataInfo) { - - if (StringUtils.isNotBlank(name) || StringUtils.isNotBlank(issnPrinted) || StringUtils.isNotBlank(issnOnline) || StringUtils.isNotBlank(issnLinking)) { - final Journal j = new Journal(); - j.setName(name); - j.setIssnPrinted(issnPrinted); - j.setIssnOnline(issnOnline); - j.setIssnLinking(issnLinking); - j.setEp(ep); - j.setIss(iss); - j.setSp(sp); - j.setVol(vol); - j.setEdition(edition); - j.setConferenceplace(conferenceplace); - j.setConferencedate(conferencedate); - j.setDataInfo(dataInfo); - return j; - } else { - return null; - } - } - - public static DataInfo dataInfo(final Boolean deletedbyinference, - final String inferenceprovenance, - final Boolean inferred, - final Boolean invisible, - final Qualifier provenanceaction, - final String trust) { - final DataInfo d = new DataInfo(); - d.setDeletedbyinference(deletedbyinference); - d.setInferenceprovenance(inferenceprovenance); - d.setInferred(inferred); - d.setInvisible(invisible); - d.setProvenanceaction(provenanceaction); - d.setTrust(trust); - return d; - } - - public static String createOpenaireId(final int prefix, final String originalId) { - final String nsPrefix = StringUtils.substringBefore(originalId, "::"); - final String rest = StringUtils.substringAfter(originalId, "::"); - return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest)); - } - - public static String createOpenaireId(final String type, final String originalId) { - switch (type) { - case "datasource": - return createOpenaireId(10, originalId); - case "organization": - return createOpenaireId(20, originalId); - case "person": - return createOpenaireId(30, originalId); - case "project": - return createOpenaireId(40, originalId); - default: - return createOpenaireId(50, originalId); - } - } - - public static String asString(final Object o) { - return o == null ? "" : o.toString(); - } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/DbClient.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/DbClient.java index 9ac0089d25..43270e4523 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/DbClient.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/DbClient.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.migration; +package eu.dnetlib.dhp.migration.utils; import java.io.Closeable; import java.io.IOException; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/MdstoreClient.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/MdstoreClient.java index 87dadfc7aa..612503da74 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/MdstoreClient.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/MdstoreClient.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.migration; +package eu.dnetlib.dhp.migration.utils; import java.io.Closeable; import java.io.IOException; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/OafMapperUtils.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/OafMapperUtils.java new file mode 100644 index 0000000000..89b344c685 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/OafMapperUtils.java @@ -0,0 +1,191 @@ +package eu.dnetlib.dhp.migration.utils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; + +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.ExtraInfo; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.Journal; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.OAIProvenance; +import eu.dnetlib.dhp.schema.oaf.OriginDescription; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.utils.DHPUtils; + +public class OafMapperUtils { + + public static KeyValue keyValue(final String k, final String v) { + final KeyValue kv = new KeyValue(); + kv.setKey(k); + kv.setValue(v); + return kv; + } + + public static List listKeyValues(final String... s) { + if (s.length % 2 > 0) { throw new RuntimeException("Invalid number of parameters (k,v,k,v,....)"); } + + final List list = new ArrayList<>(); + for (int i = 0; i < s.length; i += 2) { + list.add(keyValue(s[i], s[i + 1])); + } + return list; + } + + public static Field field(final T value, final DataInfo info) { + if (value == null || StringUtils.isBlank(value.toString())) { return null; } + + final Field field = new Field<>(); + field.setValue(value); + field.setDataInfo(info); + return field; + } + + public static List> listFields(final DataInfo info, final String... values) { + return Arrays.stream(values).map(v -> field(v, info)).filter(Objects::nonNull).collect(Collectors.toList()); + } + + public static List> listFields(final DataInfo info, final List values) { + return values.stream().map(v -> field(v, info)).filter(Objects::nonNull).collect(Collectors.toList()); + } + + public static Qualifier qualifier(final String classid, final String classname, final String schemeid, final String schemename) { + final Qualifier q = new Qualifier(); + q.setClassid(classid); + q.setClassname(classname); + q.setSchemeid(schemeid); + q.setSchemename(schemename); + return q; + } + + public static StructuredProperty structuredProperty(final String value, + final String classid, + final String classname, + final String schemeid, + final String schemename, + final DataInfo dataInfo) { + + return structuredProperty(value, qualifier(classid, classname, schemeid, schemename), dataInfo); + } + + public static StructuredProperty structuredProperty(final String value, final Qualifier qualifier, final DataInfo dataInfo) { + if (value == null) { return null; } + final StructuredProperty sp = new StructuredProperty(); + sp.setValue(value); + sp.setQualifier(qualifier); + sp.setDataInfo(dataInfo); + return sp; + } + + public static ExtraInfo extraInfo(final String name, final String value, final String typology, final String provenance, final String trust) { + final ExtraInfo info = new ExtraInfo(); + info.setName(name); + info.setValue(value); + info.setTypology(typology); + info.setProvenance(provenance); + info.setTrust(trust); + return info; + } + + public static OAIProvenance oaiIProvenance(final String identifier, + final String baseURL, + final String metadataNamespace, + final Boolean altered, + final String datestamp, + final String harvestDate) { + + final OriginDescription desc = new OriginDescription(); + desc.setIdentifier(identifier); + desc.setBaseURL(baseURL); + desc.setMetadataNamespace(metadataNamespace); + desc.setAltered(altered); + desc.setDatestamp(datestamp); + desc.setHarvestDate(harvestDate); + + final OAIProvenance p = new OAIProvenance(); + p.setOriginDescription(desc); + + return p; + } + + public static Journal journal(final String name, + final String issnPrinted, + final String issnOnline, + final String issnLinking, + final String ep, + final String iss, + final String sp, + final String vol, + final String edition, + final String conferenceplace, + final String conferencedate, + final DataInfo dataInfo) { + + if (StringUtils.isNotBlank(name) || StringUtils.isNotBlank(issnPrinted) || StringUtils.isNotBlank(issnOnline) || StringUtils.isNotBlank(issnLinking)) { + final Journal j = new Journal(); + j.setName(name); + j.setIssnPrinted(issnPrinted); + j.setIssnOnline(issnOnline); + j.setIssnLinking(issnLinking); + j.setEp(ep); + j.setIss(iss); + j.setSp(sp); + j.setVol(vol); + j.setEdition(edition); + j.setConferenceplace(conferenceplace); + j.setConferencedate(conferencedate); + j.setDataInfo(dataInfo); + return j; + } else { + return null; + } + } + + public static DataInfo dataInfo(final Boolean deletedbyinference, + final String inferenceprovenance, + final Boolean inferred, + final Boolean invisible, + final Qualifier provenanceaction, + final String trust) { + final DataInfo d = new DataInfo(); + d.setDeletedbyinference(deletedbyinference); + d.setInferenceprovenance(inferenceprovenance); + d.setInferred(inferred); + d.setInvisible(invisible); + d.setProvenanceaction(provenanceaction); + d.setTrust(trust); + return d; + } + + public static String createOpenaireId(final int prefix, final String originalId) { + final String nsPrefix = StringUtils.substringBefore(originalId, "::"); + final String rest = StringUtils.substringAfter(originalId, "::"); + return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest)); + } + + public static String createOpenaireId(final String type, final String originalId) { + switch (type) { + case "datasource": + return createOpenaireId(10, originalId); + case "organization": + return createOpenaireId(20, originalId); + case "person": + return createOpenaireId(30, originalId); + case "project": + return createOpenaireId(40, originalId); + default: + return createOpenaireId(50, originalId); + } + } + + public static String asString(final Object o) { + return o == null ? "" : o.toString(); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/PacePerson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/PacePerson.java index 927f5641bf..69e128e63c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/PacePerson.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/utils/PacePerson.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.migration.pace; +package eu.dnetlib.dhp.migration.utils; import java.nio.charset.Charset; import java.text.Normalizer; diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json index 0039493e7b..8c81290ca2 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/dispatch_entities_parameters.json @@ -1,8 +1,8 @@ [ { "paramName": "s", - "paramLongName": "sourcePaths", - "paramDescription": "the HDFS source paths which contains the sequential file (comma separated)", + "paramLongName": "sourcePath", + "paramDescription": "the source path", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/generate_entities_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/generate_entities_parameters.json new file mode 100644 index 0000000000..53ee010c45 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/generate_entities_parameters.json @@ -0,0 +1,39 @@ +[ + { + "paramName": "s", + "paramLongName": "sourcePaths", + "paramDescription": "the HDFS source paths which contains the sequential file (comma separated)", + "paramRequired": true + }, + { + "paramName": "mt", + "paramLongName": "master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "targetPath", + "paramDescription": "the path of the target file", + "paramRequired": true + }, + { + "paramName": "pgurl", + "paramLongName": "postgresUrl", + "paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb", + "paramRequired": true + }, + { + "paramName": "pguser", + "paramLongName": "postgresUser", + "paramDescription": "postgres user", + "paramRequired": false + }, + { + "paramName": "pgpasswd", + "paramLongName": "postgresPassword", + "paramDescription": "postgres password", + "paramRequired": false + } + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json index 4506e2ae1a..39e0dd5acc 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json @@ -18,19 +18,19 @@ "paramRequired": true }, { - "paramName": "dburl", + "paramName": "pgurl", "paramLongName": "postgresUrl", "paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb", "paramRequired": true }, { - "paramName": "dbuser", + "paramName": "pguser", "paramLongName": "postgresUser", "paramDescription": "postgres user", "paramRequired": false }, { - "paramName": "dbpasswd", + "paramName": "pgpasswd", "paramLongName": "postgresPassword", "paramDescription": "postgres password", "paramRequired": false diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json index 5738daa762..fc900e97dd 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_mongo_mstores_parameters.json @@ -24,7 +24,7 @@ "paramRequired": true }, { - "paramName": "db", + "paramName": "mongodb", "paramLongName": "mongoDb", "paramDescription": "mongo database", "paramRequired": true @@ -46,23 +46,5 @@ "paramLongName": "mdInterpretation", "paramDescription": "metadata interpretation", "paramRequired": true - }, - { - "paramName": "pgurl", - "paramLongName": "postgresUrl", - "paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb", - "paramRequired": true - }, - { - "paramName": "pguser", - "paramLongName": "postgresUser", - "paramDescription": "postgres user", - "paramRequired": false - }, - { - "paramName": "pgpasswd", - "paramLongName": "postgresPassword", - "paramDescription": "postgres password", - "paramRequired": false } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/claims/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/claims/oozie_app/config-default.xml new file mode 100644 index 0000000000..51e48d8f75 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/claims/oozie_app/config-default.xml @@ -0,0 +1,22 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hdfsUser + dnet + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/claims/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/claims/oozie_app/workflow.xml new file mode 100644 index 0000000000..7a1ee2cae0 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/claims/oozie_app/workflow.xml @@ -0,0 +1,175 @@ + + + + migrationClaimsPathStep1 + the base path to store hdfs file + + + migrationClaimsPathStep2 + the temporary path to store entities before dispatching + + + migrationClaimsPathStep3 + the graph Raw base path + + + postgresURL + the postgres URL to access to the database + + + postgresUser + the user postgres + + + postgresPassword + the password postgres + + + mongoURL + mongoDB url, example: mongodb://[username:password@]host[:port] + + + mongoDb + mongo database + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication + -p${migrationClaimsPathStep1}/db_claims + -n${nameNode} + -u${hdfsUser} + -pgurl${postgresURL} + -pguser${postgresUser} + -pgpasswd${postgresPassword} + -aclaims + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication + -p${migrationClaimsPathStep1}/odf_claims + -n${nameNode} + -u${hdfsUser} + -mongourl${mongoURL} + -mongodb${mongoDb} + -fODF + -lstore + -iclaim + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication + -p${migrationClaimsPathStep1}/oaf_claims + -n${nameNode} + -u${hdfsUser} + -mongourl${mongoURL} + -mongodb${mongoDb} + -fOAF + -lstore + -iclaim + + + + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + GenerateClaimEntities + eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + -s${migrationClaimsPathStep1}/db_claims,${migrationClaimsPathStep1}/oaf_claims,${migrationClaimsPathStep1}/odf_claims + -t${migrationClaimsPathStep2}/claim_entities + -pgurl${postgresURL} + -pguser${postgresUser} + -pgpasswd${postgresPassword} + + + + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + GenerateClaimGraph + eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + -s${migrationClaimsPathStep2}/claim_entities + -g${migrationClaimsPathStep3} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_all_steps/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_all_steps/oozie_app/workflow.xml index 6589633217..e273722407 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_all_steps/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_all_steps/oozie_app/workflow.xml @@ -1,14 +1,17 @@ - + - workingPath + migrationPathStep1 the base path to store hdfs file - graphRawPath + migrationPathStep2 + the temporary path to store entities before dispatching + + + migrationPathStep3 the graph Raw base path - postgresURL the postgres URL to access to the database @@ -22,7 +25,7 @@ the password postgres - mongourl + mongoURL mongoDB url, example: mongodb://[username:password@]host[:port] @@ -51,152 +54,117 @@ - - + + - + - + ${jobTracker} ${nameNode} - eu.dnetlib.dhp.migration.MigrateDbEntitiesApplication - -p${workingPath}/db_entities + eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication + -p${migrationPathStep1}/db_records -n${nameNode} -u${hdfsUser} - -dburl${postgresURL} - -dbuser${postgresUser} - -dbpasswd${postgresPassword} + -pgurl${postgresURL} + -pguser${postgresUser} + -pgpasswd${postgresPassword} - + - + ${jobTracker} ${nameNode} - eu.dnetlib.dhp.migration.MigrateDbEntitiesApplication - -p${workingPath}/db_claims + eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication + -p${migrationPathStep1}/odf_records -n${nameNode} -u${hdfsUser} - -dburl${postgresURL} - -dbuser${postgresUser} - -dbpasswd${postgresPassword} - -aclaims - - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication - -p${workingPath}/odf_entities - -n${nameNode} - -u${hdfsUser} - -mongourl${mongourl} - -db${mongoDb} + -mongourl${mongoURL} + -mongodb${mongoDb} -fODF -lstore -icleaned - -pgurl${postgresURL} - -pguser${postgresUser} - -pgpasswd${postgresPassword} - + - + ${jobTracker} ${nameNode} - eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication - -p${workingPath}/oaf_entities + eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication + -p${migrationPathStep1}/oaf_records -n${nameNode} -u${hdfsUser} - -mongourl${mongourl} - -db${mongoDb} + -mongourl${mongoURL} + -mongodb${mongoDb} -fOAF -lstore -icleaned - -pgurl${postgresURL} - -pguser${postgresUser} - -pgpasswd${postgresPassword} - + - - - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication - -p${workingPath}/odf_claims - -n${nameNode} - -u${hdfsUser} - -mongourl${mongourl} - -db${mongoDb} - -fODF - -lstore - -iclaim - -pgurl${postgresURL} - -pguser${postgresUser} - -pgpasswd${postgresPassword} - - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication - -p${workingPath}/oaf_claims - -n${nameNode} - -u${hdfsUser} - -mongourl${mongourl} - -db${mongoDb} - -fOAF - -lstore - -iclaim - -pgurl${postgresURL} - -pguser${postgresUser} - -pgpasswd${postgresPassword} - - - - - - + - - + + - + - + ${jobTracker} ${nameNode} yarn-cluster cluster - ExtractEntities - eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob + GenerateEntities + eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication dhp-aggregation-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" -mt yarn-cluster - -s${workingPath}/db_entities,${workingPath}/oaf_entities,${workingPath}/odf_entities - -g${graphRawPath} + -s${migrationPathStep1}/db_records,${migrationPathStep1}/oaf_records,${migrationPathStep1}/odf_records + -t${migrationPathStep2}/all_entities + -pgurl${postgresURL} + -pguser${postgresUser} + -pgpasswd${postgresPassword} + + + + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + GenerateGraph + eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + -s${migrationPathStep2}/all_entities + -g${migrationPathStep3} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1/oozie_app/config-default.xml new file mode 100644 index 0000000000..51e48d8f75 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1/oozie_app/config-default.xml @@ -0,0 +1,22 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hdfsUser + dnet + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1/oozie_app/workflow.xml new file mode 100644 index 0000000000..7b3c5a746d --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step1/oozie_app/workflow.xml @@ -0,0 +1,109 @@ + + + + migrationPathStep1 + the base path to store hdfs file + + + postgresURL + the postgres URL to access to the database + + + postgresUser + the user postgres + + + postgresPassword + the password postgres + + + mongoURL + mongoDB url, example: mongodb://[username:password@]host[:port] + + + mongoDb + mongo database + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication + -p${migrationPathStep1}/db_records + -n${nameNode} + -u${hdfsUser} + -pgurl${postgresURL} + -pguser${postgresUser} + -pgpasswd${postgresPassword} + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication + -p${migrationPathStep1}/odf_records + -n${nameNode} + -u${hdfsUser} + -mongourl${mongoURL} + -mongodb${mongoDb} + -fODF + -lstore + -icleaned + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication + -p${migrationPathStep1}/oaf_records + -n${nameNode} + -u${hdfsUser} + -mongourl${mongoURL} + -mongodb${mongoDb} + -fOAF + -lstore + -icleaned + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step2/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step2/oozie_app/config-default.xml new file mode 100644 index 0000000000..51e48d8f75 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step2/oozie_app/config-default.xml @@ -0,0 +1,22 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hdfsUser + dnet + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step2/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step2/oozie_app/workflow.xml new file mode 100644 index 0000000000..cd0a4025e8 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step2/oozie_app/workflow.xml @@ -0,0 +1,74 @@ + + + + migrationPathStep1 + the base path to store hdfs file + + + migrationPathStep2 + the temporary path to store entities before dispatching + + + postgresURL + the postgres URL to access to the database + + + postgresUser + the user postgres + + + postgresPassword + the password postgres + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + GenerateEntities + eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + -s${migrationPathStep1}/db_records,${migrationPathStep1}/oaf_records,${migrationPathStep1}/odf_records + -t${migrationPathStep2}/all_entities + -pgurl${postgresURL} + -pguser${postgresUser} + -pgpasswd${postgresPassword} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step3/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step3/oozie_app/config-default.xml new file mode 100644 index 0000000000..51e48d8f75 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step3/oozie_app/config-default.xml @@ -0,0 +1,22 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hdfsUser + dnet + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step3/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step3/oozie_app/workflow.xml new file mode 100644 index 0000000000..8688f09d18 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_step3/oozie_app/workflow.xml @@ -0,0 +1,60 @@ + + + + + migrationPathStep2 + the temporary path to store entities before dispatching + + + migrationPathStep3 + the graph Raw base path + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + GenerateGraph + eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + -s${migrationPathStep2}/all_entities + -g${migrationPathStep3} + + + + + + + \ No newline at end of file