From 7ba586d2e587c117aacf50fb6e1e9f9b034df35c Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 17 Dec 2019 16:24:49 +0100 Subject: [PATCH] oozie workflow aimed to build the adjacency lists representation of the graph, needed to build the records to be indexed --- .../job-override.properties | 3 + dhp-workflows/dhp-graph-provision/pom.xml | 37 +++++++ .../java/eu/dnetlib/dhp/graph/EntityNode.java | 4 + .../dnetlib/dhp/graph/GraphMappingUtils.java | 23 ++++ .../eu/dnetlib/dhp/graph/RelatedEntity.java | 69 ++++++++++++ .../dhp/graph/SparkGraphIndexingJob.java | 102 ++++++++++++++++++ .../dhp/graph/input_graph_parameters.json | 5 + .../dhp/graph/oozie_app/config-default.xml | 26 +++++ .../dnetlib/dhp/graph/oozie_app/workflow.xml | 46 ++++++++ 9 files changed, 315 insertions(+) create mode 100644 dhp-workflows/dhp-graph-provision/job-override.properties create mode 100644 dhp-workflows/dhp-graph-provision/pom.xml create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityNode.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-graph-provision/job-override.properties b/dhp-workflows/dhp-graph-provision/job-override.properties new file mode 100644 index 0000000000..31f7f88f5b --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/job-override.properties @@ -0,0 +1,3 @@ +sparkDriverMemory=16G +sparkExecutorMemory=16G +hive_db_name=claudio \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/pom.xml b/dhp-workflows/dhp-graph-provision/pom.xml new file mode 100644 index 0000000000..d474637740 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/pom.xml @@ -0,0 +1,37 @@ + + + + dhp-workflows + eu.dnetlib.dhp + 1.0.5-SNAPSHOT + + 4.0.0 + + dhp-graph-provision + + + + + org.apache.spark + spark-core_2.11 + + + org.apache.spark + spark-sql_2.11 + + + + eu.dnetlib.dhp + dhp-common + ${project.version} + + + eu.dnetlib.dhp + dhp-schemas + ${project.version} + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityNode.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityNode.java new file mode 100644 index 0000000000..be1babae2c --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/EntityNode.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.graph; + +public class EntityNode { +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java new file mode 100644 index 0000000000..ab19ff2b51 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphMappingUtils.java @@ -0,0 +1,23 @@ +package eu.dnetlib.dhp.graph; + +import com.google.common.collect.Maps; +import eu.dnetlib.dhp.schema.oaf.*; + +import java.util.Map; + +public class GraphMappingUtils { + + public final static Map types = Maps.newHashMap(); + + static { + types.put("datasource", Datasource.class); + types.put("organization", Organization.class); + types.put("project", Project.class); + types.put("dataset", Dataset.class); + types.put("otherresearchproduct", OtherResearchProduct.class); + types.put("software", Software.class); + types.put("publication", Publication.class); + types.put("relation", Relation.class); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java new file mode 100644 index 0000000000..dbab04f16e --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/RelatedEntity.java @@ -0,0 +1,69 @@ +package eu.dnetlib.dhp.graph; + +import java.io.Serializable; + +public class RelatedEntity implements Serializable { + + private String relType; + + private String subRelType; + + private String relClass; + + private String type; + + private String payload; + + public RelatedEntity(String relType, String subRelType, String relClass, String type, String payload) { + this.relType = relType; + this.subRelType = subRelType; + this.relClass = relClass; + this.type = type; + this.payload = payload; + } + + public String getRelType() { + return relType; + } + + public RelatedEntity setRelType(String relType) { + this.relType = relType; + return this; + } + + public String getSubRelType() { + return subRelType; + } + + public RelatedEntity setSubRelType(String subRelType) { + this.subRelType = subRelType; + return this; + } + + public String getRelClass() { + return relClass; + } + + public RelatedEntity setRelClass(String relClass) { + this.relClass = relClass; + return this; + } + + public String getType() { + return type; + } + + public RelatedEntity setType(String type) { + this.type = type; + return this; + } + + public String getPayload() { + return payload; + } + + public RelatedEntity setPayload(String payload) { + this.payload = payload; + return this; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java new file mode 100644 index 0000000000..04711efbd1 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/SparkGraphIndexingJob.java @@ -0,0 +1,102 @@ +package eu.dnetlib.dhp.graph; + +import com.google.common.collect.Sets; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.EntityPayload; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import scala.Tuple2; +import scala.runtime.AbstractFunction1; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.StringUtils.substringAfter; +import static org.apache.commons.lang3.StringUtils.substringBefore; +import static org.apache.spark.sql.Encoders.bean; + +public class SparkGraphIndexingJob { + + private final static String ENTITY_NODES_PATH = "/tmp/entity_node"; + private static final long LIMIT = 100; + + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGraphIndexingJob.class.getResourceAsStream("/eu/dnetlib/dhp/graph/input_graph_parameters.json"))); + parser.parseArgument(args); + final SparkSession spark = SparkSession + .builder() + .appName(SparkGraphIndexingJob.class.getSimpleName()) + .master(parser.get("master")) + .config("hive.metastore.uris", parser.get("hive_metastore_uris")) + .config("spark.driver.cores", 1) + .config("spark.executor.cores", 1) + .config("spark.yarn.executor.memoryOverhead", "4G") + .config("spark.yarn.driver.memoryOverhead", "4G") + .enableHiveSupport() + .getOrCreate(); + + final String hiveDbName = parser.get("hive_db_name"); + + final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); + if (fs.exists(new Path(ENTITY_NODES_PATH))) { + fs.delete(new Path(ENTITY_NODES_PATH), true); + } + + spark + .sql(getJoinEntitiesSQL(hiveDbName)) + .transform(toEntityNode()) + /* + .map((MapFunction) r -> { + return null; + }, bean(String.class)) + */ + .rdd() + + .saveAsTextFile(ENTITY_NODES_PATH, GzipCodec.class); + } + + private static AbstractFunction1, Dataset> toEntityNode() { + return new AbstractFunction1, Dataset>() { + @Override + public Dataset apply(Dataset d) { + return d.map((MapFunction) r -> { + + final List res = r.getList(r.fieldIndex("related_entity")); + final byte[] payload = r.getAs("payload"); + return new EntityNode(r.getAs("id"), r.getAs("type"), new String(payload)) + .setRelatedEntities(res + .stream() + .map(re -> new Tuple2<>(substringBefore(re, "@@"), substringAfter(re, "@@"))) + .map(re -> new RelatedEntity(r.getAs("reltype"), r.getAs("subreltype"), r.getAs("relclass"), re._1(), re._2())) + .limit(LIMIT) + .collect(Collectors.toList())); + + }, bean(EntityNode.class)); + } + }; + } + + private static String getJoinEntitiesSQL(String hiveDbName) { + return String.format( + "SELECT " + + "E_s.id AS id, " + + "E_s.type AS type, " + + "E_s.payload AS payload, " + + "r.reltype AS reltype, r.subreltype AS subreltype, r.relclass AS relclass, " + + "collect_list(concat(E_t.type, '@@', E_t.payload)) AS related_entity " + + "FROM %s.entities " + "" /*"TABLESAMPLE(0.1 PERCENT) "*/ + "E_s " + + "LEFT JOIN %s.relation r ON (r.source = E_s.id) " + + "JOIN %s.entities E_t ON (E_t.id = r.target) \n" + + "GROUP BY E_s.id, E_s.type, E_s.payload, r.reltype, r.subreltype, r.relclass", hiveDbName, hiveDbName, hiveDbName); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json new file mode 100644 index 0000000000..613389d794 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json @@ -0,0 +1,5 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"h", "paramLongName":"hive_metastore_uris","paramDescription": "the hive metastore uris", "paramRequired": true}, + {"paramName":"db", "paramLongName":"hive_db_name", "paramDescription": "the target hive database name", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml new file mode 100644 index 0000000000..fcab9dd00a --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml @@ -0,0 +1,26 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hive_db_name + openaire + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml new file mode 100644 index 0000000000..473b697cd9 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml @@ -0,0 +1,46 @@ + + + + hive_db_name + the target hive database name + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + GraphIndexing + eu.dnetlib.dhp.graph.SparkGraphIndexingJob + dhp-graph-provision-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" + -mt yarn-cluster + --hive_db_name${hive_db_name} + --hive_metastore_uris${hive_metastore_uris} + + + + + + + \ No newline at end of file