diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java index 4d2ccb178..cf0a183d7 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java @@ -47,17 +47,14 @@ public class DispatchEntitiesSparkJob { String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible")); + boolean filterInvisible = Boolean.parseBoolean(parser.get("filterInvisible")); log.info("filterInvisible: {}", filterInvisible); SparkConf conf = new SparkConf(); runWithSparkSession( conf, isSparkSessionManaged, - spark -> { - HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - dispatchEntities(spark, inputPath, outputPath, filterInvisible); - }); + spark -> dispatchEntities(spark, inputPath, outputPath, filterInvisible)); } private static void dispatchEntities( @@ -72,7 +69,9 @@ public class DispatchEntitiesSparkJob { String entityType = entry.getKey(); Class clazz = entry.getValue(); + final String entityPath = outputPath + "/" + entityType; if (!entityType.equalsIgnoreCase("relation")) { + HdfsSupport.remove(entityPath, spark.sparkContext().hadoopConfiguration()); Dataset entityDF = spark .read() .schema(Encoders.bean(clazz).schema()) @@ -91,7 +90,7 @@ public class DispatchEntitiesSparkJob { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(outputPath + "/" + entityType); + .json(entityPath); } }); }