diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java index 59e068c3f5..573be6de2e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java @@ -9,6 +9,7 @@ import java.util.*; import java.util.stream.Collectors; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; @@ -22,6 +23,8 @@ import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.dump.oaf.*; import eu.dnetlib.dhp.schema.dump.oaf.graph.*; +import eu.dnetlib.dhp.schema.dump.oaf.graph.Funder; +import eu.dnetlib.dhp.schema.dump.oaf.graph.Project; import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.dhp.schema.oaf.Journal; import eu.dnetlib.dhp.schema.oaf.OafEntity; @@ -45,7 +48,7 @@ public class DumpGraphEntities implements Serializable { DumpProducts d = new DumpProducts(); d .run( - isSparkSessionManaged, inputPath, outputPath, communityMapPath, inputClazz, Result.class, + isSparkSessionManaged, inputPath, outputPath, communityMapPath, inputClazz, GraphResult.class, true); break; case "40": @@ -86,7 +89,7 @@ public class DumpGraphEntities implements Serializable { Class inputClazz) { Utils .readPath(spark, inputPath, inputClazz) - .map(d -> mapDatasource((eu.dnetlib.dhp.schema.oaf.Datasource) d), Encoders.bean(Datasource.class)) + .map((MapFunction) d -> mapDatasource((eu.dnetlib.dhp.schema.oaf.Datasource) d), Encoders.bean(Datasource.class)) .filter(Objects::nonNull) .write() .mode(SaveMode.Overwrite) @@ -98,7 +101,7 @@ public class DumpGraphEntities implements Serializable { Class inputClazz) { Utils .readPath(spark, inputPath, inputClazz) - .map(p -> mapProject((eu.dnetlib.dhp.schema.oaf.Project) p), Encoders.bean(Project.class)) + .map((MapFunction) p -> mapProject((eu.dnetlib.dhp.schema.oaf.Project) p), Encoders.bean(Project.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") @@ -440,7 +443,7 @@ public class DumpGraphEntities implements Serializable { Class inputClazz) { Utils .readPath(spark, inputPath, inputClazz) - .map(o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o), Encoders.bean(Organization.class)) + .map((MapFunction) o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o), Encoders.bean(Organization.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip")