This commit is contained in:
Miriam Baglioni 2020-08-19 11:29:51 +02:00
parent e42b2f5ae2
commit 1c593a9cfe
1 changed files with 7 additions and 4 deletions

View File

@ -9,6 +9,7 @@ import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
@ -22,6 +23,8 @@ import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.dump.oaf.*; import eu.dnetlib.dhp.schema.dump.oaf.*;
import eu.dnetlib.dhp.schema.dump.oaf.graph.*; import eu.dnetlib.dhp.schema.dump.oaf.graph.*;
import eu.dnetlib.dhp.schema.dump.oaf.graph.Funder;
import eu.dnetlib.dhp.schema.dump.oaf.graph.Project;
import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Journal; import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.OafEntity;
@ -45,7 +48,7 @@ public class DumpGraphEntities implements Serializable {
DumpProducts d = new DumpProducts(); DumpProducts d = new DumpProducts();
d d
.run( .run(
isSparkSessionManaged, inputPath, outputPath, communityMapPath, inputClazz, Result.class, isSparkSessionManaged, inputPath, outputPath, communityMapPath, inputClazz, GraphResult.class,
true); true);
break; break;
case "40": case "40":
@ -86,7 +89,7 @@ public class DumpGraphEntities implements Serializable {
Class<E> inputClazz) { Class<E> inputClazz) {
Utils Utils
.readPath(spark, inputPath, inputClazz) .readPath(spark, inputPath, inputClazz)
.map(d -> mapDatasource((eu.dnetlib.dhp.schema.oaf.Datasource) d), Encoders.bean(Datasource.class)) .map((MapFunction<E, Datasource>) d -> mapDatasource((eu.dnetlib.dhp.schema.oaf.Datasource) d), Encoders.bean(Datasource.class))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
@ -98,7 +101,7 @@ public class DumpGraphEntities implements Serializable {
Class<E> inputClazz) { Class<E> inputClazz) {
Utils Utils
.readPath(spark, inputPath, inputClazz) .readPath(spark, inputPath, inputClazz)
.map(p -> mapProject((eu.dnetlib.dhp.schema.oaf.Project) p), Encoders.bean(Project.class)) .map((MapFunction<E,Project>) p -> mapProject((eu.dnetlib.dhp.schema.oaf.Project) p), Encoders.bean(Project.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
@ -440,7 +443,7 @@ public class DumpGraphEntities implements Serializable {
Class<E> inputClazz) { Class<E> inputClazz) {
Utils Utils
.readPath(spark, inputPath, inputClazz) .readPath(spark, inputPath, inputClazz)
.map(o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o), Encoders.bean(Organization.class)) .map((MapFunction<E, Organization>) o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o), Encoders.bean(Organization.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")