removing from the dump organization that have been deleted by inference

This commit is contained in:
Miriam Baglioni 2021-04-07 12:11:36 +02:00
parent 66d64947af
commit 7f9b7cfcf6
1 changed files with 3 additions and 3 deletions

View File

@ -9,6 +9,7 @@ import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
@ -453,7 +454,7 @@ public class DumpGraphEntities implements Serializable {
.map( .map(
(MapFunction<E, Organization>) o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o), (MapFunction<E, Organization>) o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o),
Encoders.bean(Organization.class)) Encoders.bean(Organization.class))
.filter(Objects::nonNull) .filter((FilterFunction<Organization>) o -> o != null)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
@ -461,11 +462,10 @@ public class DumpGraphEntities implements Serializable {
} }
private static Organization mapOrganization(eu.dnetlib.dhp.schema.oaf.Organization org) { private static Organization mapOrganization(eu.dnetlib.dhp.schema.oaf.Organization org) {
Organization organization = new Organization();
if (org.getDataInfo().getDeletedbyinference()) if (org.getDataInfo().getDeletedbyinference())
return null; return null;
Organization organization = new Organization();
Optional Optional
.ofNullable(org.getLegalshortname()) .ofNullable(org.getLegalshortname())
.ifPresent(value -> organization.setLegalshortname(value.getValue())); .ifPresent(value -> organization.setLegalshortname(value.getValue()));