diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index d9f6433a0..9d553ded3 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -14,6 +14,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; @@ -84,19 +85,26 @@ public class SparkCountryPropagationJob { Dataset res = readPath(spark, sourcePath, resultClazz); log.info("Reading prepared info: {}", preparedInfoPath); - Dataset prepared = spark + final Dataset preparedInfoRaw = spark .read() - .json(preparedInfoPath) - .as(Encoders.bean(ResultCountrySet.class)); - - res - .joinWith(prepared, res.col("id").equalTo(prepared.col("resultId")), "left_outer") - .map(getCountryMergeFn(), Encoders.bean(resultClazz)) - .write() - .option("compression", "gzip") - .mode(SaveMode.Overwrite) - .json(outputPath); + .json(preparedInfoPath); + if (!preparedInfoRaw.isEmpty()) { + final Dataset prepared = preparedInfoRaw.as(Encoders.bean(ResultCountrySet.class)); + res + .joinWith(prepared, res.col("id").equalTo(prepared.col("resultId")), "left_outer") + .map(getCountryMergeFn(), Encoders.bean(resultClazz)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath); + } else { + res + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath); + } } private static MapFunction, R> getCountryMergeFn() {