diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java index 5574aad753..5d0b75a8e0 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java @@ -7,6 +7,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.util.*; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; @@ -19,6 +20,7 @@ import com.google.gson.Gson; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Relation; +import scala.Tuple2; public class PrepareResultCommunitySet { @@ -93,10 +95,24 @@ public class PrepareResultCommunitySet { result_organizationset .map(mapResultCommunityFn(organizationMap), Encoders.bean(ResultCommunityList.class)) .filter(Objects::nonNull) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); + .toJavaRDD() + .mapToPair(value -> new Tuple2<>(value.getResultId(), value)) + .reduceByKey((a, b) -> { + ArrayList cl = a.getCommunityList(); + b.getCommunityList().stream().forEach(s -> { + if (!cl.contains(s)) { + cl.add(s); + } + }); + a.setCommunityList(cl); + return a; + }) + .map(value -> OBJECT_MAPPER.writeValueAsString(value._2())) + .saveAsTextFile(outputPath, GzipCodec.class); +// .write() +// .mode(SaveMode.Overwrite) +// .option("compression", "gzip") +// .json(outputPath); } private static MapFunction mapResultCommunityFn(