From dc80ab14d3ad408c136089299de620971c1c6c91 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 12 Sep 2023 14:34:28 +0200 Subject: [PATCH 1/2] [graph dedup] consistency wf should not remove the relations while dispatching the entities --- .../dhp/oa/merge/DispatchEntitiesSparkJob.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java index 4d2ccb178..cf0a183d7 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java @@ -47,17 +47,14 @@ public class DispatchEntitiesSparkJob { String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible")); + boolean filterInvisible = Boolean.parseBoolean(parser.get("filterInvisible")); log.info("filterInvisible: {}", filterInvisible); SparkConf conf = new SparkConf(); runWithSparkSession( conf, isSparkSessionManaged, - spark -> { - HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - dispatchEntities(spark, inputPath, outputPath, filterInvisible); - }); + spark -> dispatchEntities(spark, inputPath, outputPath, filterInvisible)); } private static void dispatchEntities( @@ -72,7 +69,9 @@ public class DispatchEntitiesSparkJob { String entityType = entry.getKey(); Class clazz = entry.getValue(); + final String entityPath = outputPath + "/" + entityType; if (!entityType.equalsIgnoreCase("relation")) { + HdfsSupport.remove(entityPath, spark.sparkContext().hadoopConfiguration()); Dataset entityDF = spark .read() .schema(Encoders.bean(clazz).schema()) @@ -91,7 +90,7 @@ public class DispatchEntitiesSparkJob { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(outputPath + "/" + entityType); + .json(entityPath); } }); } From 2aed5a74be100c9615c7b248333271de2b71678d Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Tue, 12 Sep 2023 22:31:50 +0300 Subject: [PATCH 2/2] Run CC and RAM sequentieally in dhp-impact-indicators WF --- .../impact_indicators/oozie_app/workflow.xml | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml index 0d7d29bfe..e43e7cf14 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml @@ -39,7 +39,8 @@ - ${wf:conf('resume') eq "rankings-start"} + ${wf:conf('resume') eq "cc"} + ${wf:conf('resume') eq "ram"} ${wf:conf('resume') eq "impulse"} ${wf:conf('resume') eq "pagerank"} ${wf:conf('resume') eq "attrank"} @@ -89,18 +90,11 @@ ${nameNode}${wfAppPath}/create_openaire_ranking_graph.py#create_openaire_ranking_graph.py - + - - - - - - - @@ -129,7 +123,7 @@ ${wfAppPath}/bip-ranker/CC.py#CC.py - + @@ -165,14 +159,11 @@ ${wfAppPath}/bip-ranker/TAR.py#TAR.py - + - - -