changed the dump to move from h2020programme to h2020classification

This commit is contained in:
Miriam Baglioni 2020-09-23 17:33:00 +02:00
parent 1d84cf19a6
commit 39eb8ab25b
2 changed files with 20 additions and 8 deletions

View File

@ -11,6 +11,7 @@ import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
@ -57,7 +58,7 @@ public class DumpProducts implements Serializable {
Utils Utils
.readPath(spark, inputPath, inputClazz) .readPath(spark, inputPath, inputClazz)
.map(value -> execMap(value, communityMap, graph), Encoders.bean(outputClazz)) .map((MapFunction<I, O>) value -> execMap(value, communityMap, graph), Encoders.bean(outputClazz))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)

View File

@ -9,6 +9,7 @@ import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
@ -88,7 +89,9 @@ public class DumpGraphEntities implements Serializable {
Class<E> inputClazz) { Class<E> inputClazz) {
Utils Utils
.readPath(spark, inputPath, inputClazz) .readPath(spark, inputPath, inputClazz)
.map(d -> mapDatasource((eu.dnetlib.dhp.schema.oaf.Datasource) d), Encoders.bean(Datasource.class)) .map(
(MapFunction<E, Datasource>) d -> mapDatasource((eu.dnetlib.dhp.schema.oaf.Datasource) d),
Encoders.bean(Datasource.class))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
@ -100,7 +103,9 @@ public class DumpGraphEntities implements Serializable {
Class<E> inputClazz) { Class<E> inputClazz) {
Utils Utils
.readPath(spark, inputPath, inputClazz) .readPath(spark, inputPath, inputClazz)
.map(p -> mapProject((eu.dnetlib.dhp.schema.oaf.Project) p), Encoders.bean(Project.class)) .map(
(MapFunction<E, Project>) p -> mapProject((eu.dnetlib.dhp.schema.oaf.Project) p),
Encoders.bean(Project.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
@ -374,13 +379,17 @@ public class DumpGraphEntities implements Serializable {
} }
project project
.setProgramme( .setH2020Classifications(
Optional Optional
.ofNullable(p.getProgramme()) .ofNullable(p.getH2020classification())
.map( .map(
programme -> programme classification -> classification
.stream() .stream()
.map(pg -> Programme.newInstance(pg.getCode(), pg.getDescription())) .map(
c -> H2020Classification
.newInstance(
c.getH2020Programme().getCode(), c.getH2020Programme().getDescription(),
c.getLevel1(), c.getLevel2(), c.getLevel3(), c.getClassification()))
.collect(Collectors.toList())) .collect(Collectors.toList()))
.orElse(new ArrayList<>())); .orElse(new ArrayList<>()));
@ -442,7 +451,9 @@ public class DumpGraphEntities implements Serializable {
Class<E> inputClazz) { Class<E> inputClazz) {
Utils Utils
.readPath(spark, inputPath, inputClazz) .readPath(spark, inputPath, inputClazz)
.map(o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o), Encoders.bean(Organization.class)) .map(
(MapFunction<E, Organization>) o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o),
Encoders.bean(Organization.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")