From 202f6e62ff0002573ff69be4dbe570b7b6569989 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Wed, 24 Jun 2020 15:47:06 +0200 Subject: [PATCH] Splitted join wf --- ...JoinEntitiesJob.java => JoinStep1Job.java} | 53 +++++------- .../dnetlib/dhp/broker/oa/JoinStep2Job.java | 79 ++++++++++++++++++ .../dnetlib/dhp/broker/oa/JoinStep3Job.java | 79 ++++++++++++++++++ .../dnetlib/dhp/broker/oa/JoinStep4Job.java | 79 ++++++++++++++++++ .../dhp/broker/oa/PrepareGroupsJob.java | 4 +- .../broker/oa/PrepareRelatedDatasetsJob.java | 1 + .../broker/oa/PrepareRelatedProjectsJob.java | 5 +- .../oa/PrepareRelatedPublicationsJob.java | 5 +- .../broker/oa/PrepareRelatedSoftwaresJob.java | 5 +- .../oa/generate_all/oozie_app/workflow.xml | 80 ++++++++++++++++++- .../broker/oa/partial/oozie_app/workflow.xml | 55 +++++++++---- 11 files changed, 379 insertions(+), 66 deletions(-) rename dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/{JoinEntitiesJob.java => JoinStep1Job.java} (52%) create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinEntitiesJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java similarity index 52% rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinEntitiesJob.java rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java index 868faa8f56..1be782a127 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinEntitiesJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java @@ -11,7 +11,7 @@ import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.expressions.Aggregator; +import org.apache.spark.sql.TypedColumn; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,15 +22,15 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProjectAggregator; import scala.Tuple2; -public class JoinEntitiesJob { +public class JoinStep1Job { - private static final Logger log = LoggerFactory.getLogger(JoinEntitiesJob.class); + private static final Logger log = LoggerFactory.getLogger(JoinStep1Job.class); public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - JoinEntitiesJob.class + JoinStep1Job.class .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); parser.parseArgument(args); @@ -43,7 +43,7 @@ public class JoinEntitiesJob { final String workingPath = parser.get("workingPath"); log.info("workingPath: {}", workingPath); - final String joinedEntitiesPath = workingPath + "/joinedEntities"; + final String joinedEntitiesPath = workingPath + "/joinedEntities_step1"; log.info("joinedEntitiesPath: {}", joinedEntitiesPath); final SparkConf conf = new SparkConf(); @@ -52,39 +52,28 @@ public class JoinEntitiesJob { ClusterUtils.removeDir(spark, joinedEntitiesPath); - final Dataset r0 = ClusterUtils + final Dataset sources = ClusterUtils .readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class); - final Dataset r1 = join( - r0, ClusterUtils.readPath(spark, workingPath + "/relatedProjects", RelatedProject.class), - new RelatedProjectAggregator()); - // final Dataset r2 = join( - // r1, ClusterUtils.readPath(spark, workingPath + "/relatedDatasets", RelatedDataset.class), new - // RelatedDatasetAggregator()); - // final Dataset r3 = join( - // r2, ClusterUtils.readPath(spark, workingPath + "/relatedPublications", RelatedPublication.class), new - // RelatedPublicationAggregator()); - // final Dataset r4 = join( - // r3, ClusterUtils.readPath(spark, workingPath + "/relatedSoftwares", RelatedSoftware.class), new - // RelatedSoftwareAggregator()); + final Dataset typedRels = ClusterUtils + .readPath(spark, workingPath + "/relatedProjects", RelatedProject.class); - r1.write().mode(SaveMode.Overwrite).json(joinedEntitiesPath); + final TypedColumn, OaBrokerMainEntity> aggr = new RelatedProjectAggregator() + .toColumn(); + + sources + .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") + .groupByKey( + (MapFunction, String>) t -> t._1.getOpenaireId(), + Encoders.STRING()) + .agg(aggr) + .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) + .write() + .mode(SaveMode.Overwrite) + .json(joinedEntitiesPath); }); } - private static Dataset join(final Dataset sources, - final Dataset typedRels, - final Aggregator, OaBrokerMainEntity, OaBrokerMainEntity> aggr) { - - return sources - .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") - .groupByKey( - (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) - .agg(aggr.toColumn()) - .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); - - } - } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java new file mode 100644 index 0000000000..103d795530 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java @@ -0,0 +1,79 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.TypedColumn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftwareAggregator; +import scala.Tuple2; + +public class JoinStep2Job { + + private static final Logger log = LoggerFactory.getLogger(JoinStep2Job.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + JoinStep2Job.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + final String joinedEntitiesPath = workingPath + "/joinedEntities_step2"; + log.info("joinedEntitiesPath: {}", joinedEntitiesPath); + + final SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + ClusterUtils.removeDir(spark, joinedEntitiesPath); + + final Dataset sources = ClusterUtils + .readPath(spark, workingPath + "/joinedEntities_step1", OaBrokerMainEntity.class); + + final Dataset typedRels = ClusterUtils + .readPath(spark, workingPath + "/relatedSoftwares", RelatedSoftware.class); + + final TypedColumn, OaBrokerMainEntity> aggr = new RelatedSoftwareAggregator() + .toColumn(); + + sources + .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") + .groupByKey( + (MapFunction, String>) t -> t._1.getOpenaireId(), + Encoders.STRING()) + .agg(aggr) + .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) + .write() + .mode(SaveMode.Overwrite) + .json(joinedEntitiesPath); + + }); + + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java new file mode 100644 index 0000000000..ceb199dc45 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java @@ -0,0 +1,79 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.TypedColumn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDatasetAggregator; +import scala.Tuple2; + +public class JoinStep3Job { + + private static final Logger log = LoggerFactory.getLogger(JoinStep3Job.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + JoinStep3Job.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + final String joinedEntitiesPath = workingPath + "/joinedEntities_step3"; + log.info("joinedEntitiesPath: {}", joinedEntitiesPath); + + final SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + ClusterUtils.removeDir(spark, joinedEntitiesPath); + + final Dataset sources = ClusterUtils + .readPath(spark, workingPath + "/joinedEntities_step2", OaBrokerMainEntity.class); + + final Dataset typedRels = ClusterUtils + .readPath(spark, workingPath + "/relatedDatasets", RelatedDataset.class); + + final TypedColumn, OaBrokerMainEntity> aggr = new RelatedDatasetAggregator() + .toColumn(); + + sources + .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") + .groupByKey( + (MapFunction, String>) t -> t._1.getOpenaireId(), + Encoders.STRING()) + .agg(aggr) + .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) + .write() + .mode(SaveMode.Overwrite) + .json(joinedEntitiesPath); + + }); + + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java new file mode 100644 index 0000000000..3067810dd5 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java @@ -0,0 +1,79 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.TypedColumn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublicationAggregator; +import scala.Tuple2; + +public class JoinStep4Job { + + private static final Logger log = LoggerFactory.getLogger(JoinStep4Job.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + JoinStep4Job.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + final String joinedEntitiesPath = workingPath + "/joinedEntities_step4"; + log.info("joinedEntitiesPath: {}", joinedEntitiesPath); + + final SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + ClusterUtils.removeDir(spark, joinedEntitiesPath); + + final Dataset sources = ClusterUtils + .readPath(spark, workingPath + "/joinedEntities_step3", OaBrokerMainEntity.class); + + final Dataset typedRels = ClusterUtils + .readPath(spark, workingPath + "/relatedPublications", RelatedPublication.class); + + final TypedColumn, OaBrokerMainEntity> aggr = new RelatedPublicationAggregator() + .toColumn(); + + sources + .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") + .groupByKey( + (MapFunction, String>) t -> t._1.getOpenaireId(), + Encoders.STRING()) + .agg(aggr) + .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) + .write() + .mode(SaveMode.Overwrite) + .json(joinedEntitiesPath); + + }); + + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java index 934ddff59a..47a9f36c5f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java @@ -32,7 +32,7 @@ public class PrepareGroupsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - JoinEntitiesJob.class + PrepareGroupsJob.class .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); parser.parseArgument(args); @@ -58,7 +58,7 @@ public class PrepareGroupsJob { ClusterUtils.removeDir(spark, groupsPath); final Dataset results = ClusterUtils - .readPath(spark, workingPath + "/joinedEntities", OaBrokerMainEntity.class); + .readPath(spark, workingPath + "/joinedEntities_step4", OaBrokerMainEntity.class); final Dataset mergedRels = ClusterUtils .readPath(spark, graphPath + "/relation", Relation.class) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java index fe9c87e877..6e006ccf02 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java @@ -61,6 +61,7 @@ public class PrepareRelatedDatasetsJob { final Dataset rels = ClusterUtils .readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) .filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass())) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java index 3ae240982d..0af5d21b78 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java @@ -13,8 +13,6 @@ import org.apache.spark.sql.SaveMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.broker.objects.OaBrokerProject; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; @@ -29,8 +27,6 @@ public class PrepareRelatedProjectsJob { private static final Logger log = LoggerFactory.getLogger(PrepareRelatedProjectsJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils @@ -67,6 +63,7 @@ public class PrepareRelatedProjectsJob { final Dataset rels = ClusterUtils .readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT)) .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java index 8814ef3e09..84752776e2 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java @@ -13,8 +13,6 @@ import org.apache.spark.sql.SaveMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.broker.objects.OaBrokerRelatedPublication; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; @@ -28,8 +26,6 @@ public class PrepareRelatedPublicationsJob { private static final Logger log = LoggerFactory.getLogger(PrepareRelatedPublicationsJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils @@ -68,6 +64,7 @@ public class PrepareRelatedPublicationsJob { final Dataset rels = ClusterUtils .readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) .filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass())) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java index 0704fb44a8..0ad753a974 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java @@ -13,8 +13,6 @@ import org.apache.spark.sql.SaveMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; @@ -29,8 +27,6 @@ public class PrepareRelatedSoftwaresJob { private static final Logger log = LoggerFactory.getLogger(PrepareRelatedSoftwaresJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils @@ -67,6 +63,7 @@ public class PrepareRelatedSoftwaresJob { final Dataset rels = ClusterUtils .readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 18e2eedca8..8752200ffa 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -216,14 +216,86 @@ - + - + yarn cluster - JoinEntitiesJob - eu.dnetlib.dhp.broker.oa.JoinEntitiesJob + JoinStep1 + eu.dnetlib.dhp.broker.oa.JoinStep1Job + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + JoinStep2 + eu.dnetlib.dhp.broker.oa.JoinStep2Job + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + JoinStep3 + eu.dnetlib.dhp.broker.oa.JoinStep3Job + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + JoinStep4 + eu.dnetlib.dhp.broker.oa.JoinStep4Job dhp-broker-events-${projectVersion}.jar --executor-cores=${sparkExecutorCores} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index 1ccdef929f..26fa429e61 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -73,19 +73,19 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - JoinEntitiesJob - eu.dnetlib.dhp.broker.oa.JoinEntitiesJob + JoinStep1 + eu.dnetlib.dhp.broker.oa.JoinStep1Job dhp-broker-events-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -100,16 +100,16 @@ --graphPath${graphInputPath} --workingPath${workingPath} - + - - + + yarn cluster - PrepareGroupsJob - eu.dnetlib.dhp.broker.oa.PrepareGroupsJob + JoinStep2 + eu.dnetlib.dhp.broker.oa.JoinStep2Job dhp-broker-events-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -124,16 +124,16 @@ --graphPath${graphInputPath} --workingPath${workingPath} - + - - + + yarn cluster - GenerateEventsJob - eu.dnetlib.dhp.broker.oa.GenerateEventsJob + JoinStep3 + eu.dnetlib.dhp.broker.oa.JoinStep3Job dhp-broker-events-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -145,9 +145,32 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + JoinStep4 + eu.dnetlib.dhp.broker.oa.JoinStep4Job + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} --workingPath${workingPath} - --isLookupUrl${isLookupUrl} - --dedupConfProfile${dedupConfProfId}