diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java index 5eac12e5d..3ee72c318 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.Serializable; import java.util.*; +import static org.apache.spark.sql.functions.*; + import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.asRelatedEntity; /** @@ -93,9 +95,10 @@ public class GraphJoiner_v2 implements Serializable { .union(otherresearchproduct) .union(software) .union(publication) - .repartition(20000) - .write() - .parquet(getOutPath() + "/entities"); + .repartition(7000) + .write() + .partitionBy("id") + .parquet(getOutPath() + "/entities"); Dataset> entities = getSpark() .read() @@ -108,29 +111,51 @@ public class GraphJoiner_v2 implements Serializable { t.setOaf(r.getAs("oaf")); return new Tuple2<>(t.getId(), t); - }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class))); + }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class))) + .cache(); System.out.println("Entities, number of partitions: " + entities.rdd().getNumPartitions()); System.out.println("Entities schema:"); entities.printSchema(); + System.out.println("Entities count:" + entities.count()); -/* // reads the relationships - Dataset rels = readPathRelation(jsc, getInputPath()) + readPathRelation(jsc, getInputPath()) .groupByKey((MapFunction) t -> SortableRelationKey.from(t), Encoders.kryo(SortableRelationKey.class)) - .flatMapGroups((FlatMapGroupsFunction) (key, values) -> Iterators.limit(values, MAX_RELS), Encoders.bean(Relation.class)) + .flatMapGroups((FlatMapGroupsFunction) (key, values) -> Iterators.limit(values, MAX_RELS), Encoders.kryo(Relation.class)) + .repartition(3000) + .write() + .partitionBy("source", "target") + .parquet(getOutPath() + "/relations"); + + Dataset rels = getSpark() + .read() + .load(getOutPath() + "/relations") + .map((MapFunction) r -> { + Relation rel = new Relation(); + rel.setSource(r.getAs("source")); + rel.setTarget(r.getAs("target")); + rel.setRelType(r.getAs("relType")); + rel.setSubRelType(r.getAs("subRelType")); + rel.setRelClass(r.getAs("relClass")); + rel.setDataInfo(r.getAs("dataInfo")); + rel.setCollectedFrom(r.getList(r.fieldIndex("collectedFrom"))); + return rel; + }, Encoders.kryo(Relation.class)) .cache(); System.out.println("Relation schema:"); - rels.printSchema(); + System.out.println("Relation, number of partitions: " + rels.rdd().getNumPartitions()); + System.out.println("Relation schema:"); + entities.printSchema(); + System.out.println("Relation count:" + rels.count()); + /* Dataset> relsByTarget = rels .map((MapFunction>) r -> new Tuple2<>(r.getTarget(), r), Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class))); - System.out.println("Relation by target schema:"); - relsByTarget.printSchema(); - Dataset> bySource = relsByTarget + relsByTarget .joinWith(entities, relsByTarget.col("_1").equalTo(entities.col("_1")), "inner") .filter((FilterFunction, Tuple2>>) value -> value._2()._2().getDeleted() == false) .map((MapFunction, Tuple2>, EntityRelEntity>) t -> { @@ -139,12 +164,28 @@ public class GraphJoiner_v2 implements Serializable { e.setTarget(asRelatedEntity(t._2()._2())); return e; }, Encoders.kryo(EntityRelEntity.class)) - .map((MapFunction>) e -> new Tuple2<>(e.getRelation().getSource(), e), - Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class))); + .repartition(20000) + .write() + .parquet(getOutPath() + "/bySource"); - System.out.println("bySource schema"); + Dataset> bySource = getSpark() + .read() + .load(getOutPath() + "/bySource") + .map(new MapFunction() { + @Override + public EntityRelEntity call(Row value) throws Exception { + return null; + } + }, Encoders.kryo(EntityRelEntity.class)) + .map((MapFunction>) e -> new Tuple2<>(e.getRelation().getSource(), e), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class))) + + System.out.println("bySource schema"); bySource.printSchema(); + + + Dataset joined = entities .joinWith(bySource, entities.col("_1").equalTo(bySource.col("_1")), "left") .map((MapFunction, Tuple2>, EntityRelEntity>) value -> { diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/config-default.xml index b1a494ac4..c070d8338 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/config-default.xml @@ -27,4 +27,16 @@ spark2EventLogDir /user/spark/spark2ApplicationHistory + + spark2ExtraListeners + "com.cloudera.spark.lineage.NavigatorAppListener" + + + spark2SqlQueryExecutionListeners + "com.cloudera.spark.lineage.NavigatorQueryListener" + + + oozieActionShareLibForSpark2 + spark2 + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index f4bd3f19e..194cd43c8 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -25,6 +25,20 @@ sparkExecutorCoresForIndexing number of cores used by single executor + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + spark2YarnHistoryServerAddress spark 2.* yarn history server address @@ -40,12 +54,8 @@ ${nameNode} - mapreduce.job.queuename - ${queueName} - - - oozie.launcher.mapred.job.queue.name - ${oozieLauncherQueueName} + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} @@ -76,11 +86,11 @@ eu.dnetlib.dhp.oa.provision.SparkXmlRecordBuilderJob_v2 dhp-graph-provision-${projectVersion}.jar - --executor-cores ${sparkExecutorCoresForJoining} - --executor-memory ${sparkExecutorMemoryForJoining} + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" - --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} @@ -107,8 +117,8 @@ --executor-memory ${sparkExecutorMemoryForIndexing} --driver-memory=${sparkDriverMemoryForIndexing} --conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForIndexing} - --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" - --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}