From af2f7705fc631c7dbd7d0f7918f21e525e507ab2 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 23 Jun 2020 08:37:35 +0200 Subject: [PATCH] partial refactoring of some joins --- .../broker/oa/PrepareRelatedDatasetsJob.java | 16 +- .../broker/oa/PrepareRelatedProjectsJob.java | 4 +- .../oa/PrepareRelatedPublicationsJob.java | 29 +-- .../broker/oa/PrepareRelatedSoftwaresJob.java | 28 ++- .../broker/oa/PrepareSimpleEntititiesJob.java | 1 + .../dhp/broker/oa/util/ClusterUtils.java | 4 + .../oa/generate_all/oozie_app/workflow.xml | 9 +- .../oa/partial/oozie_app/config-default.xml | 18 ++ .../broker/oa/partial/oozie_app/workflow.xml | 215 ++++++++++++++++++ 9 files changed, 289 insertions(+), 35 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java index edf9b9a7e6..bcd333d564 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java @@ -13,6 +13,7 @@ import org.apache.spark.sql.SaveMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import eu.dnetlib.broker.objects.OaBrokerRelatedDataset; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; @@ -52,18 +53,23 @@ public class PrepareRelatedDatasetsJob { ClusterUtils.removeDir(spark, relsPath); - final Dataset datasets = ClusterUtils - .readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); + final Dataset datasets = ClusterUtils + .readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class) + .filter(d -> !ClusterUtils.isDedupRoot(d.getId())) + .map(ConversionUtils::oafDatasetToBrokerDataset, Encoders.bean(OaBrokerRelatedDataset.class)); - final Dataset rels = ClusterUtils.readPath(spark, graphPath + "/relation", Relation.class); + final Dataset rels = ClusterUtils + .readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) + .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); rels - .joinWith(datasets, datasets.col("id").equalTo(rels.col("target")), "inner") + .joinWith(datasets, datasets.col("openaireId").equalTo(rels.col("target")), "inner") .map( t -> new RelatedDataset( t._1.getSource(), t._1.getRelType(), - ConversionUtils.oafDatasetToBrokerDataset(t._2)), + t._2), Encoders.bean(RelatedDataset.class)) .write() .mode(SaveMode.Overwrite) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java index 00957972ad..0460bfabb2 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java @@ -62,7 +62,9 @@ public class PrepareRelatedProjectsJob { final Dataset rels = ClusterUtils .readPath(spark, graphPath + "/relation", Relation.class) - .filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT)); + .filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT)) + .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) + .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); rels .joinWith(projects, projects.col("id").equalTo(rels.col("target")), "inner") diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java index 945fd9ed77..f3db509bb7 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java @@ -15,6 +15,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.broker.objects.OaBrokerRelatedPublication; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; @@ -31,9 +32,8 @@ public class PrepareRelatedPublicationsJob { public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - PrepareRelatedPublicationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + .toString(PrepareRelatedPublicationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -57,19 +57,22 @@ public class PrepareRelatedPublicationsJob { ClusterUtils.removeDir(spark, relsPath); - final Dataset pubs = ClusterUtils - .readPath(spark, graphPath + "/publication", Publication.class); + final Dataset pubs = ClusterUtils + .readPath(spark, graphPath + "/publication", Publication.class) + .filter(p -> !ClusterUtils.isDedupRoot(p.getId())) + .map(ConversionUtils::oafPublicationToBrokerPublication, Encoders.bean(OaBrokerRelatedPublication.class)); - final Dataset rels = ClusterUtils.readPath(spark, graphPath + "/relation", Relation.class); + final Dataset rels = ClusterUtils + .readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) + .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); rels - .joinWith(pubs, pubs.col("id").equalTo(rels.col("target")), "inner") - .map( - t -> new RelatedPublication( - t._1.getSource(), - t._1.getRelType(), - ConversionUtils.oafPublicationToBrokerPublication(t._2)), - Encoders.bean(RelatedPublication.class)) + .joinWith(pubs, pubs.col("openaireId").equalTo(rels.col("target")), "inner") + .map(t -> new RelatedPublication( + t._1.getSource(), + t._1.getRelType(), + t._2), Encoders.bean(RelatedPublication.class)) .write() .mode(SaveMode.Overwrite) .json(relsPath); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java index edb8dc1c3a..ffc3a8c658 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java @@ -15,6 +15,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; @@ -31,9 +32,8 @@ public class PrepareRelatedSoftwaresJob { public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - PrepareRelatedSoftwaresJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + .toString(PrepareRelatedSoftwaresJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -57,18 +57,22 @@ public class PrepareRelatedSoftwaresJob { ClusterUtils.removeDir(spark, relsPath); - final Dataset softwares = ClusterUtils.readPath(spark, graphPath + "/software", Software.class); + final Dataset softwares = ClusterUtils + .readPath(spark, graphPath + "/software", Software.class) + .filter(sw -> !ClusterUtils.isDedupRoot(sw.getId())) + .map(ConversionUtils::oafSoftwareToBrokerSoftware, Encoders.bean(OaBrokerRelatedSoftware.class)); - final Dataset rels = ClusterUtils.readPath(spark, graphPath + "/relation", Relation.class); + final Dataset rels = ClusterUtils + .readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) + .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); rels - .joinWith(softwares, softwares.col("id").equalTo(rels.col("target")), "inner") - .map( - t -> new RelatedSoftware( - t._1.getSource(), - t._1.getRelType(), - ConversionUtils.oafSoftwareToBrokerSoftware(t._2)), - Encoders.bean(RelatedSoftware.class)) + .joinWith(softwares, softwares.col("openaireId").equalTo(rels.col("target")), "inner") + .map(t -> new RelatedSoftware( + t._1.getSource(), + t._1.getRelType(), + t._2), Encoders.bean(RelatedSoftware.class)) .write() .mode(SaveMode.Overwrite) .json(relsPath); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java index 213003db28..1b9c279fd4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java @@ -74,6 +74,7 @@ public class PrepareSimpleEntititiesJob { return ClusterUtils .readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass) + .filter(r -> !ClusterUtils.isDedupRoot(r.getId())) .filter(r -> r.getDataInfo().getDeletedbyinference()) .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OaBrokerMainEntity.class)); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java index 15a1ddd880..968bde881b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java @@ -32,4 +32,8 @@ public class ClusterUtils { .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); } + public static boolean isDedupRoot(final String id) { + return id.contains("dedup_wf_"); + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 9783fcab60..bec6f221dd 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -88,11 +88,11 @@ - + + - @@ -119,6 +119,7 @@ + yarn @@ -190,7 +191,7 @@ - + yarn @@ -214,7 +215,7 @@ - + diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/config-default.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/config-default.xml new file mode 100644 index 0000000000..2e0ed9aeea --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml new file mode 100644 index 0000000000..2539105955 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -0,0 +1,215 @@ + + + + + graphInputPath + the path where the graph is stored + + + workingPath + the path where the the generated data will be stored + + + isLookupUrl + the address of the lookUp service + + + dedupConfProfId + the id of a valid Dedup Configuration Profile + + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + yarn + cluster + PrepareRelatedPublicationsJob + eu.dnetlib.dhp.broker.oa.PrepareRelatedPublicationsJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + + yarn + cluster + PrepareRelatedDatasetsJob + eu.dnetlib.dhp.broker.oa.PrepareRelatedDatasetsJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + JoinEntitiesJob + eu.dnetlib.dhp.broker.oa.JoinEntitiesJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + PrepareGroupsJob + eu.dnetlib.dhp.broker.oa.PrepareGroupsJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + GenerateEventsJob + eu.dnetlib.dhp.broker.oa.GenerateEventsJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + --isLookupUrl${isLookupUrl} + --dedupConfProfile${dedupConfProfId} + + + + + + + + \ No newline at end of file