From e725c88ebb6eedea235abfc07af932756c9e2397 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 29 Jul 2021 13:03:16 +0200 Subject: [PATCH 1/3] [raw_all] patching relation identifier phase to be run at the end, i.e. includes also claimed relations --- .../oa/graph/raw_all/oozie_app/workflow.xml | 75 +++++++++---------- 1 file changed, 37 insertions(+), 38 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index e7320de3b..321ca4090 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -548,42 +548,7 @@ - - - - - - ${(shouldPatchRelations eq "true") and - (fs:exists(concat(concat(wf:conf('nameNode'),'/'),wf:conf('idMappingPath'))) eq "true")} - - - - - - - - yarn - cluster - PatchRelations - eu.dnetlib.dhp.oa.graph.raw.PatchRelationsApplication - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --executor-cores ${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --graphBasePath${workingDir}/graph_raw - --workingDir${workingDir}/patch_relations - --idMappingPath${idMappingPath} - - - - + @@ -596,7 +561,6 @@ - yarn @@ -805,7 +769,42 @@ - + + + + + + ${(shouldPatchRelations eq "true") and + (fs:exists(concat(concat(wf:conf('nameNode'),'/'),wf:conf('idMappingPath'))) eq "true")} + + + + + + + + yarn + cluster + PatchRelations + eu.dnetlib.dhp.oa.graph.raw.PatchRelationsApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --graphBasePath${graphOutputPath} + --workingDir${workingDir}/patch_relations + --idMappingPath${idMappingPath} + + + + \ No newline at end of file From 6e3554a45e4ac1598b3a6177c9d3a85f687b7325 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 29 Jul 2021 13:56:37 +0200 Subject: [PATCH 2/3] [provision] lowercase relation filter --- .../java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index 7d53d3554..b3f785492 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -10,6 +10,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -81,6 +82,7 @@ public class PrepareRelationsJob { Set relationFilter = Optional .ofNullable(parser.get("relationFilter")) + .map(String::toLowerCase) .map(s -> Sets.newHashSet(Splitter.on(",").split(s))) .orElse(new HashSet<>()); log.info("relationFilter: {}", relationFilter); @@ -130,7 +132,7 @@ public class PrepareRelationsJob { JavaRDD rels = readPathRelationRDD(spark, inputRelationsPath) .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false) - .filter(rel -> relationFilter.contains(rel.getRelClass()) == false); + .filter(rel -> relationFilter.contains(StringUtils.lowerCase(rel.getRelClass())) == false); JavaRDD pruned = pruneRels( pruneRels( From 9bc4fd3b69981716c6a6b96b1234871539bf2f79 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 30 Jul 2021 10:34:05 +0200 Subject: [PATCH 3/3] Patch FCT relations - fixed issue with join --- .../dhp/oa/graph/raw/PatchRelationsApplication.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/PatchRelationsApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/PatchRelationsApplication.java index dddc53bc8..5bbc8a975 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/PatchRelationsApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/PatchRelationsApplication.java @@ -82,7 +82,7 @@ public class PatchRelationsApplication { log.info("relations: {}", rels.count()); log.info("idMapping: {}", idMapping.count()); - rels + Dataset fj = rels .joinWith(idMapping, rels.col("source").equalTo(idMapping.col("oldId")), "left") .map((MapFunction, Relation>) t -> { final Relation r = t._1(); @@ -90,8 +90,9 @@ public class PatchRelationsApplication { .map(RelationIdMapping::getNewId) .ifPresent(r::setSource); return r; - }, Encoders.bean(Relation.class)) - .joinWith(idMapping, rels.col("target").equalTo(idMapping.col("oldId")), "left") + }, Encoders.bean(Relation.class)); + + fj.joinWith(idMapping, fj.col("target").equalTo(idMapping.col("oldId")), "left") .map((MapFunction, Relation>) t -> { final Relation r = t._1(); Optional.ofNullable(t._2())