From 76ee85141a10fead8a67c807416d6cb3b88a167d Mon Sep 17 00:00:00 2001 From: "sandro.labruzzo" Date: Mon, 17 Feb 2020 12:31:44 +0100 Subject: [PATCH] added oozie job for DNET migration and implemented Spark job for extracting entities --- .../migration/ExtractEntitiesFromHDFSJob.java | 56 ++++ ...extract_entities_from_hdfs_parameters.json | 26 ++ .../migration/oozie_app/config-default.xml | 22 ++ .../dhp/migration/oozie_app/workflow.xml | 282 ++++++++++++++++++ .../dnetlib/dhp/graph/GraphMappingUtils.java | 12 +- pom.xml | 2 +- 6 files changed, 393 insertions(+), 7 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/ExtractEntitiesFromHDFSJob.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/ExtractEntitiesFromHDFSJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/ExtractEntitiesFromHDFSJob.java new file mode 100644 index 000000000..f2d9caebf --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/ExtractEntitiesFromHDFSJob.java @@ -0,0 +1,56 @@ +package eu.dnetlib.dhp.migration; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; + +import java.util.Arrays; +import java.util.List; + +public class ExtractEntitiesFromHDFSJob { + + + private static List folderNames = Arrays.asList("db_entities", "oaf_entities", "odf_entities"); + + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString(MigrateMongoMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json"))); + parser.parseArgument(args); + + final SparkSession spark = SparkSession + .builder() + .appName(ExtractEntitiesFromHDFSJob.class.getSimpleName()) + .master(parser.get("master")) + .getOrCreate(); + + final String sourcePath = parser.get("sourcePath"); + final String targetPath = parser.get("graphRawPath"); + final String entity = parser.get("entity"); + + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + + JavaRDD inputRdd = sc.emptyRDD(); + + + folderNames.forEach(p -> inputRdd.union( + sc.sequenceFile(sourcePath+"/"+p, Text.class, Text.class) + .map(k -> new Tuple2<>(k._1().toString(), k._2().toString())) + .filter(k -> isEntityType(k._1(), entity)) + .map(Tuple2::_2)) + ); + + inputRdd.saveAsTextFile(targetPath+"/"+entity); + } + + + private static boolean isEntityType(final String item, final String entity) { + return StringUtils.substringAfter(item, ":").equalsIgnoreCase(entity); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json new file mode 100644 index 000000000..f179ee0f8 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the HDFS source path which contains the sequential file", + "paramRequired": true + }, + { + "paramName": "mt", + "paramLongName": "master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName": "g", + "paramLongName": "graphRawPath", + "paramDescription": "the path of the graph Raw in hdfs", + "paramRequired": true + }, + { + "paramName": "e", + "paramLongName": "entity", + "paramDescription": "The entity to extract", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/config-default.xml new file mode 100644 index 000000000..51e48d8f7 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/config-default.xml @@ -0,0 +1,22 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hdfsUser + dnet + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/workflow.xml new file mode 100644 index 000000000..309a6d90f --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/workflow.xml @@ -0,0 +1,282 @@ + + + + workingPath + the base path to store hdfs file + + + graphRawPath + the graph Raw base path + + + + postgresURL + the postgres URL to access to the database + + + postgresUser + the user postgres + + + postgresPassword + the password postgres + + + mongourl + mongoDB url, example: mongodb://[username:password@]host[:port] + + + mongoDb + mongo database + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.MigrateDbEntitiesApplication + -p${workingPath}/db_entities + -n${nameNode} + -u${hdfsUser} + -dburl${postgresURL} + -dbuser${postgresUser} + -dbpasswd${postgresPassword} + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication + -p${workingPath}/odf_entities + -n${nameNode} + -u${hdfsUser} + -mongourl${mongourl} + -db${mongoDb} + -fODF + -lstore + -icleaned + -pgurl${postgresURL} + -pguser${postgresUser} + -pgpasswd${postgresPassword} + + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication + -p${workingPath}/oaf_entities + -n${nameNode} + -u${hdfsUser} + -mongourl${mongourl} + -db${mongoDb} + -fOAF + -lstore + -icleaned + -pgurl${postgresURL} + -pguser${postgresUser} + -pgpasswd${postgresPassword} + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + ExtractEntities: publication + eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + --sourcePath${workingPath} + -g${graphRawPath}/publication + -epublication + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + ExtractEntities: dataset + eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + --sourcePath${workingPath} + -g${graphRawPath}/dataset + -edataset + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + ExtractEntities: software + eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + --sourcePath${workingPath} + -g${graphRawPath}/software + -esoftware + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + ExtractEntities: otherresearchproduct + eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + --sourcePath${workingPath} + -g${graphRawPath}/otherresearchproduct + -eotherresearchproduct + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + ExtractEntities: datasource + eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + --sourcePath${workingPath} + -g${graphRawPath}/datasource + -edatasource + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + ExtractEntities: organization + eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + --sourcePath${workingPath} + -g${graphRawPath}/organization + -eorganization + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + ExtractEntities: project + eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + --sourcePath${workingPath} + -g${graphRawPath}/project + -eproject + + + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + ExtractEntities: relation + eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob + dhp-aggregation-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + --sourcePath${workingPath} + -g${graphRawPath}/relation + -erelation + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java index 0291be47e..7c0967b2e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java @@ -18,13 +18,13 @@ public class GraphMappingUtils { public final static Map types = Maps.newHashMap(); static { - types.put("datasource", Datasource.class); - types.put("organization", Organization.class); + types.put("datasource", Datasource.class); + types.put("organization", Organization.class); types.put("project", Project.class); - types.put("dataset", Dataset.class); - types.put("otherresearchproduct", OtherResearchProduct.class); - types.put("software", Software.class); - types.put("publication", Publication.class); + types.put("dataset", Dataset.class); + types.put("otherresearchproduct", OtherResearchProduct.class); + types.put("software", Software.class); + types.put("publication", Publication.class); types.put("relation", Relation.class); } diff --git a/pom.xml b/pom.xml index 6f85886c0..658d8285f 100644 --- a/pom.xml +++ b/pom.xml @@ -202,7 +202,7 @@ eu.dnetlib dnet-pace-core - 4.0.0-SNAPSHOT + 4.0.0