added groupby id to fix multiple result with same id at join step

This commit is contained in:
Miriam Baglioni 2020-05-22 15:32:55 +02:00
parent 1e44703e3e
commit 8610ad5142
1 changed files with 20 additions and 4 deletions

View File

@ -7,6 +7,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.*;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
@ -19,6 +20,7 @@ import com.google.gson.Gson;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
public class PrepareResultCommunitySet {
@ -93,10 +95,24 @@ public class PrepareResultCommunitySet {
result_organizationset
.map(mapResultCommunityFn(organizationMap), Encoders.bean(ResultCommunityList.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
.toJavaRDD()
.mapToPair(value -> new Tuple2<>(value.getResultId(), value))
.reduceByKey((a, b) -> {
ArrayList<String> cl = a.getCommunityList();
b.getCommunityList().stream().forEach(s -> {
if (!cl.contains(s)) {
cl.add(s);
}
});
a.setCommunityList(cl);
return a;
})
.map(value -> OBJECT_MAPPER.writeValueAsString(value._2()))
.saveAsTextFile(outputPath, GzipCodec.class);
// .write()
// .mode(SaveMode.Overwrite)
// .option("compression", "gzip")
// .json(outputPath);
}
private static MapFunction<ResultOrganizations, ResultCommunityList> mapResultCommunityFn(