diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java index c97d2d72ae..8a6c913952 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java @@ -11,6 +11,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -57,7 +58,7 @@ public class DumpProducts implements Serializable { Utils .readPath(spark, inputPath, inputClazz) - .map(value -> execMap(value, communityMap, graph), Encoders.bean(outputClazz)) + .map((MapFunction) value -> execMap(value, communityMap, graph), Encoders.bean(outputClazz)) .filter(Objects::nonNull) .write() .mode(SaveMode.Overwrite) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java index 86421cff51..3851c5d354 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java @@ -9,6 +9,7 @@ import java.util.*; import java.util.stream.Collectors; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; @@ -88,7 +89,9 @@ public class DumpGraphEntities implements Serializable { Class inputClazz) { Utils .readPath(spark, inputPath, inputClazz) - .map(d -> mapDatasource((eu.dnetlib.dhp.schema.oaf.Datasource) d), Encoders.bean(Datasource.class)) + .map( + (MapFunction) d -> mapDatasource((eu.dnetlib.dhp.schema.oaf.Datasource) d), + Encoders.bean(Datasource.class)) .filter(Objects::nonNull) .write() .mode(SaveMode.Overwrite) @@ -100,7 +103,9 @@ public class DumpGraphEntities implements Serializable { Class inputClazz) { Utils .readPath(spark, inputPath, inputClazz) - .map(p -> mapProject((eu.dnetlib.dhp.schema.oaf.Project) p), Encoders.bean(Project.class)) + .map( + (MapFunction) p -> mapProject((eu.dnetlib.dhp.schema.oaf.Project) p), + Encoders.bean(Project.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") @@ -374,13 +379,17 @@ public class DumpGraphEntities implements Serializable { } project - .setProgramme( + .setH2020Classifications( Optional - .ofNullable(p.getProgramme()) + .ofNullable(p.getH2020classification()) .map( - programme -> programme + classification -> classification .stream() - .map(pg -> Programme.newInstance(pg.getCode(), pg.getDescription())) + .map( + c -> H2020Classification + .newInstance( + c.getH2020Programme().getCode(), c.getH2020Programme().getDescription(), + c.getLevel1(), c.getLevel2(), c.getLevel3(), c.getClassification())) .collect(Collectors.toList())) .orElse(new ArrayList<>())); @@ -442,7 +451,9 @@ public class DumpGraphEntities implements Serializable { Class inputClazz) { Utils .readPath(spark, inputPath, inputClazz) - .map(o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o), Encoders.bean(Organization.class)) + .map( + (MapFunction) o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o), + Encoders.bean(Organization.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip")