From 668ac25224c093b6b06bbb7e6c51227f0a5c6c14 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 15 Nov 2021 17:02:45 +0100 Subject: [PATCH 1/3] [graph resolution] using existing argument parser file name --- .../dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala index afd195ed0..316b8afed 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/resolution/SparkResolveEntities.scala @@ -19,7 +19,7 @@ object SparkResolveEntities { def main(args: Array[String]): Unit = { val log: Logger = LoggerFactory.getLogger(getClass) val conf: SparkConf = new SparkConf() - val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/resolution/resolve_params.json"))) + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/resolution/resolve_entities_params.json"))) parser.parseArgument(args) val spark: SparkSession = SparkSession From bafa2990f31b4c39af95be9393f153e1b4823cfc Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 15 Nov 2021 17:07:16 +0100 Subject: [PATCH 2/3] code formatting --- .../CreateActionSetSparkJob.java | 8 ++-- .../CreateOpenCitationsASTest.java | 38 +++++++++---------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java index eeb86a8ff..ea5fea96f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java @@ -65,10 +65,10 @@ public class CreateActionSetSparkJob implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath {}", outputPath); - final boolean shouldDuplicateRels = - Optional.ofNullable(parser.get("shouldDuplicateRels")) - .map(Boolean::valueOf) - .orElse(Boolean.FALSE); + final boolean shouldDuplicateRels = Optional + .ofNullable(parser.get("shouldDuplicateRels")) + .map(Boolean::valueOf) + .orElse(Boolean.FALSE); SparkConf conf = new SparkConf(); runWithSparkSession( diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java index 7567f855b..5a04dcefe 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateOpenCitationsASTest.java @@ -84,8 +84,8 @@ public class CreateOpenCitationsASTest { new String[] { "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-shouldDuplicateRels", - Boolean.TRUE.toString(), + "-shouldDuplicateRels", + Boolean.TRUE.toString(), "-inputPath", inputPath, "-outputPath", @@ -101,7 +101,7 @@ public class CreateOpenCitationsASTest { assertEquals(60, tmp.count()); - // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); + // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); } @@ -109,31 +109,31 @@ public class CreateOpenCitationsASTest { void testNumberofRelations2() throws Exception { String inputPath = getClass() - .getResource( - "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") - .getPath(); + .getResource( + "/eu/dnetlib/dhp/actionmanager/opencitations/inputFiles") + .getPath(); CreateActionSetSparkJob - .main( - new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-inputPath", - inputPath, - "-outputPath", - workingDir.toString() + "/actionSet" - }); + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + inputPath, + "-outputPath", + workingDir.toString() + "/actionSet" + }); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) - .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) - .map(aa -> ((Relation) aa.getPayload())); + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); assertEquals(44, tmp.count()); - // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); + // tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); } From 0a727d325d5d0ebd4fb9ed3f29482cf6e472b7c2 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 16 Nov 2021 08:43:41 +0100 Subject: [PATCH 3/3] [dedup] increased number of partitions in the consistency phase --- .../dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml index 83a47ea6c..4ea003926 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml @@ -89,7 +89,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 + --conf spark.sql.shuffle.partitions=15000 --graphBasePath${graphBasePath} --o${graphOutputPath} @@ -114,7 +114,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 + --conf spark.sql.shuffle.partitions=15000 --graphInputPath${graphBasePath} --outputPath${workingPath}/grouped_entities