diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java index d8d803458..37e693de9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java @@ -4,9 +4,12 @@ package eu.dnetlib.dhp.oa.graph.clean.country; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; +import java.util.Collection; import java.util.List; import java.util.Optional; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.swing.text.html.Option; @@ -30,6 +33,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob; import eu.dnetlib.dhp.schema.oaf.Country; +import eu.dnetlib.dhp.schema.oaf.Instance; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.utils.PidType; @@ -110,8 +114,8 @@ public class CleanCountrySparkJob implements Serializable { return r; } - if (r - .getPid() + List ids = getPidsAndAltIds(r).collect(Collectors.toList()); + if (ids .stream() .anyMatch( p -> p @@ -148,6 +152,42 @@ public class CleanCountrySparkJob implements Serializable { .json(inputPath); } + private static Stream getPidsAndAltIds(T r) { + final Stream resultPids = Optional + .ofNullable(r.getPid()) + .map(Collection::stream) + .orElse(Stream.empty()); + + final Stream instancePids = Optional + .ofNullable(r.getInstance()) + .map( + instance -> instance + .stream() + .flatMap( + i -> Optional + .ofNullable(i.getPid()) + .map(Collection::stream) + .orElse(Stream.empty()))) + .orElse(Stream.empty()); + + final Stream instanceAltIds = Optional + .ofNullable(r.getInstance()) + .map( + instance -> instance + .stream() + .flatMap( + i -> Optional + .ofNullable(i.getAlternateIdentifier()) + .map(Collection::stream) + .orElse(Stream.empty()))) + .orElse(Stream.empty()); + + return Stream + .concat( + Stream.concat(resultPids, instancePids), + instanceAltIds); + } + private static boolean pidInParam(String value, String[] verifyParam) { for (String s : verifyParam) if (value.startsWith(s))