diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java index 4d2ccb178..cf0a183d7 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java @@ -47,17 +47,14 @@ public class DispatchEntitiesSparkJob { String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible")); + boolean filterInvisible = Boolean.parseBoolean(parser.get("filterInvisible")); log.info("filterInvisible: {}", filterInvisible); SparkConf conf = new SparkConf(); runWithSparkSession( conf, isSparkSessionManaged, - spark -> { - HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - dispatchEntities(spark, inputPath, outputPath, filterInvisible); - }); + spark -> dispatchEntities(spark, inputPath, outputPath, filterInvisible)); } private static void dispatchEntities( @@ -72,7 +69,9 @@ public class DispatchEntitiesSparkJob { String entityType = entry.getKey(); Class clazz = entry.getValue(); + final String entityPath = outputPath + "/" + entityType; if (!entityType.equalsIgnoreCase("relation")) { + HdfsSupport.remove(entityPath, spark.sparkContext().hadoopConfiguration()); Dataset entityDF = spark .read() .schema(Encoders.bean(clazz).schema()) @@ -91,7 +90,7 @@ public class DispatchEntitiesSparkJob { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(outputPath + "/" + entityType); + .json(entityPath); } }); } diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml index 0d7d29bfe..e43e7cf14 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml @@ -39,7 +39,8 @@ - ${wf:conf('resume') eq "rankings-start"} + ${wf:conf('resume') eq "cc"} + ${wf:conf('resume') eq "ram"} ${wf:conf('resume') eq "impulse"} ${wf:conf('resume') eq "pagerank"} ${wf:conf('resume') eq "attrank"} @@ -89,18 +90,11 @@ ${nameNode}${wfAppPath}/create_openaire_ranking_graph.py#create_openaire_ranking_graph.py - + - - - - - - - @@ -129,7 +123,7 @@ ${wfAppPath}/bip-ranker/CC.py#CC.py - + @@ -165,14 +159,11 @@ ${wfAppPath}/bip-ranker/TAR.py#TAR.py - + - - -