1
0
Fork 0

added specification of MapFunction types in map

This commit is contained in:
Miriam Baglioni 2020-11-24 14:40:22 +01:00
parent 44db258dc4
commit 00c377dac2
1 changed files with 3 additions and 1 deletions

View File

@ -8,6 +8,7 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
@ -20,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
import scala.Tuple2;
public class SparkUpdateProjectInfo implements Serializable { public class SparkUpdateProjectInfo implements Serializable {
@ -73,7 +75,7 @@ public class SparkUpdateProjectInfo implements Serializable {
.joinWith( .joinWith(
resultProject, result.col("id").equalTo(resultProject.col("resultId")), resultProject, result.col("id").equalTo(resultProject.col("resultId")),
"left") "left")
.map(value -> { .map((MapFunction<Tuple2<CommunityResult, ResultProject>, CommunityResult>) value -> {
CommunityResult r = value._1(); CommunityResult r = value._1();
Optional.ofNullable(value._2()).ifPresent(rp -> { Optional.ofNullable(value._2()).ifPresent(rp -> {
r.setProjects(rp.getProjectsList()); r.setProjects(rp.getProjectsList());