dump #50

Merged
claudio.atzori merged 98 commits from miriam.baglioni/dnet-hadoop:dump into master 2020-11-04 18:07:01 +01:00
3 changed files with 14 additions and 9 deletions
Showing only changes of commit 8694bb9b31 - Show all commits

View File

@ -8,8 +8,6 @@ import java.io.StringReader;
import java.util.*;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.dump.oaf.graph.Funder;
import eu.dnetlib.dhp.schema.dump.oaf.graph.Project;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
@ -91,7 +89,9 @@ public class DumpGraphEntities implements Serializable {
Class<E> inputClazz) {
Utils
.readPath(spark, inputPath, inputClazz)
.map((MapFunction<E, Datasource>) d -> mapDatasource((eu.dnetlib.dhp.schema.oaf.Datasource) d), Encoders.bean(Datasource.class))
.map(
(MapFunction<E, Datasource>) d -> mapDatasource((eu.dnetlib.dhp.schema.oaf.Datasource) d),
Encoders.bean(Datasource.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
@ -103,7 +103,9 @@ public class DumpGraphEntities implements Serializable {
Class<E> inputClazz) {
Utils
.readPath(spark, inputPath, inputClazz)
.map((MapFunction<E,Project>) p -> mapProject((eu.dnetlib.dhp.schema.oaf.Project) p), Encoders.bean(Project.class))
.map(
(MapFunction<E, Project>) p -> mapProject((eu.dnetlib.dhp.schema.oaf.Project) p),
Encoders.bean(Project.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
@ -445,7 +447,9 @@ public class DumpGraphEntities implements Serializable {
Class<E> inputClazz) {
Utils
.readPath(spark, inputPath, inputClazz)
.map((MapFunction<E, Organization>) o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o), Encoders.bean(Organization.class))
.map(
(MapFunction<E, Organization>) o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o),
Encoders.bean(Organization.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")

View File

@ -49,9 +49,9 @@ public class SparkCollectAndSave implements Serializable {
log.info("outputPath: {}", outputPath);
final Boolean aggregateResult = Optional
.ofNullable(parser.get("resultAggregation"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
.ofNullable(parser.get("resultAggregation"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
SparkConf conf = new SparkConf();

View File

@ -8,6 +8,7 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
@ -67,7 +68,7 @@ public class SparkDumpRelationJob implements Serializable {
private static void dumpRelation(SparkSession spark, String inputPath, String outputPath) {
Utils
.readPath(spark, inputPath, Relation.class)
.map(relation -> {
.map((MapFunction<Relation, eu.dnetlib.dhp.schema.dump.oaf.graph.Relation>) relation -> {
eu.dnetlib.dhp.schema.dump.oaf.graph.Relation rel = new eu.dnetlib.dhp.schema.dump.oaf.graph.Relation();
rel
.setSource(