diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkCollectAndSave.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkCollectAndSave.java index cb150210a..1ae351dbf 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkCollectAndSave.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkCollectAndSave.java @@ -8,6 +8,7 @@ import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; @@ -20,7 +21,6 @@ import eu.dnetlib.dhp.schema.dump.oaf.graph.Relation; /** * Reads all the entities of the same type (Relation / Results) and saves them in the same folder - * */ public class SparkCollectAndSave implements Serializable { @@ -48,6 +48,11 @@ public class SparkCollectAndSave implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); + final Boolean aggregateResult = Optional + .ofNullable(parser.get("resultAggregation")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + SparkConf conf = new SparkConf(); runWithSparkSession( @@ -55,22 +60,42 @@ public class SparkCollectAndSave implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath + "/result"); - run(spark, inputPath, outputPath); + run(spark, inputPath, outputPath, aggregateResult); }); } - private static void run(SparkSession spark, String inputPath, String outputPath) { - Utils - .readPath(spark, inputPath + "/result/publication", Result.class) - .union(Utils.readPath(spark, inputPath + "/result/dataset", Result.class)) - .union(Utils.readPath(spark, inputPath + "/result/otherresearchproduct", Result.class)) - .union(Utils.readPath(spark, inputPath + "/result/software", Result.class)) - .write() - .option("compression", "gzip") - .mode(SaveMode.Overwrite) - .json(outputPath + "/result"); + private static void run(SparkSession spark, String inputPath, String outputPath, boolean aggregate) { + if (aggregate) { + Utils + .readPath(spark, inputPath + "/result/publication", Result.class) + .union(Utils.readPath(spark, inputPath + "/result/dataset", Result.class)) + .union(Utils.readPath(spark, inputPath + "/result/otherresearchproduct", Result.class)) + .union(Utils.readPath(spark, inputPath + "/result/software", Result.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath + "/result"); + } else { + write( + Utils + .readPath(spark, inputPath + "/result/publication", Result.class), + outputPath + "/publication"); + write( + Utils + .readPath(spark, inputPath + "/result/dataset", Result.class), + outputPath + "/dataset"); + write( + Utils + .readPath(spark, inputPath + "/result/otherresearchproduct", Result.class), + outputPath + "/otheresearchproduct"); + write( + Utils + .readPath(spark, inputPath + "/result/software", Result.class), + outputPath + "/software"); + + } Utils .readPath(spark, inputPath + "/relation/publication", Relation.class) @@ -86,4 +111,12 @@ public class SparkCollectAndSave implements Serializable { .json(outputPath + "/relation"); } + + private static void write(Dataset dataSet, String outputPath) { + dataSet + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath); + } }