enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
1 changed files with 13 additions and 1 deletions
Showing only changes of commit 669c05c771 - Show all commits

View File

@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -19,6 +20,7 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.rdd.SequenceFileRDDFunctions; import org.apache.spark.rdd.SequenceFileRDDFunctions;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
@ -122,7 +124,17 @@ public class SparkAtomicActionJob {
return null; return null;
}, Encoders.bean(Project.class)) }, Encoders.bean(Project.class))
.filter(p -> !(p == null)) .filter(Objects::nonNull)
.groupByKey(
(MapFunction<Project, String>) p -> p.getId(),
Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Project, Project>) (s, it) -> {
Project first = it.next();
it.forEachRemaining(p -> {
first.mergeFrom(p);
});
return first;
}, Encoders.bean(Project.class))
.toJavaRDD() .toJavaRDD()
.map(p -> new AtomicAction(Project.class, p)) .map(p -> new AtomicAction(Project.class, p))
.mapToPair( .mapToPair(