From 730228d73dbd6058cb0b2283412d9f3238dddc7a Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 8 Dec 2022 18:40:22 +0100 Subject: [PATCH 1/6] [cleaning] align wf parameter names in test --- .../dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java index b0097ed6fc..9096180ef9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java @@ -143,7 +143,7 @@ public class CleanCfHbSparkJobTest { "--outputPath", outputPath, "--resolvedPath", resolvedPath + "/dataset", "--graphTableClassName", Dataset.class.getCanonicalName(), - "--datasourceMasterDuplicate", dsMasterDuplicatePath + "--masterDuplicatePath", dsMasterDuplicatePath }); assertTrue(Files.exists(Paths.get(graphOutputPath, "dataset"))); From 389dd25430c3fa193d93f74523f61a204fe4f50f Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 8 Dec 2022 18:40:48 +0100 Subject: [PATCH 2/6] [cleaning] avoid NPE --- .../dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java index eb7325af5f..f48226d717 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java @@ -13,6 +13,7 @@ import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; @@ -105,6 +106,7 @@ public class CleanCfHbSparkJob { resolved .joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId"))) .map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class)) + .filter((FilterFunction) m -> Objects.nonNull(m.getMasterId())) .write() .mode(SaveMode.Overwrite) .json(resolvedPath); @@ -134,9 +136,15 @@ public class CleanCfHbSparkJob { private static MapFunction, IdCfHbMapping> asIdCfHbMapping() { return t -> { - t._1().setMasterId(t._2().getMasterId()); - t._1().setMasterName(t._2().getMasterName()); - return t._1(); + final IdCfHbMapping mapping = t._1(); + Optional + .ofNullable(t._2()) + .ifPresent(t2 -> { + mapping.setMasterId(t2.getMasterId()); + mapping.setMasterName(t2.getMasterName()); + + }); + return mapping; }; } From 8b44afe5e547612c5922e8a76dbf2f1ade5ceb23 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 9 Dec 2022 15:44:57 +0100 Subject: [PATCH 3/6] [cleaning] avoid NPE --- .../eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java index f48226d717..7cc26745b2 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java @@ -157,7 +157,8 @@ public class CleanCfHbSparkJob { r.getInstance().stream().map(Instance::getHostedby).map(KeyValue::getKey), r.getInstance().stream().map(Instance::getCollectedfrom).map(KeyValue::getKey))) .distinct() - .map(s -> asIdCfHbMapping(r.getId(), s)) + .filter(StringUtils::isNotBlank) + .map(cfHb -> asIdCfHbMapping(r.getId(), cfHb)) .iterator(); } From c18b8048c378963641801caaff73cd708bfce5de Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Sat, 10 Dec 2022 11:41:38 +0100 Subject: [PATCH 4/6] [cleaning] avoid NPE --- .../graph/clean/cfhb/CleanCfHbSparkJob.java | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java index 7cc26745b2..531b415ed6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java @@ -151,11 +151,32 @@ public class CleanCfHbSparkJob { private static FlatMapFunction flattenCfHbFn() { return r -> Stream .concat( - r.getCollectedfrom().stream().map(KeyValue::getKey), + Optional + .ofNullable(r.getCollectedfrom()) + .map(cf -> cf.stream().map(KeyValue::getKey)) + .orElse(Stream.empty()), Stream .concat( - r.getInstance().stream().map(Instance::getHostedby).map(KeyValue::getKey), - r.getInstance().stream().map(Instance::getCollectedfrom).map(KeyValue::getKey))) + Optional + .ofNullable(r.getInstance()) + .map( + instances -> instances + .stream() + .map(i -> Optional.ofNullable(i.getHostedby()).map(KeyValue::getKey).orElse(""))) + .orElse(Stream.empty()) + .filter(StringUtils::isNotBlank), + Optional + .ofNullable(r.getInstance()) + .map( + instances -> instances + .stream() + .map( + i -> Optional + .ofNullable(i.getCollectedfrom()) + .map(KeyValue::getKey) + .orElse(""))) + .orElse(Stream.empty()) + .filter(StringUtils::isNotBlank))) .distinct() .filter(StringUtils::isNotBlank) .map(cfHb -> asIdCfHbMapping(r.getId(), cfHb)) From b8bafab8a0f380befb8544d2ffe44f898b89ffab Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 12 Dec 2022 14:43:03 +0100 Subject: [PATCH 5/6] [cleaning] improved vocabulary based mapping, specialization for the strict vocab cleaning --- .../dhp/common/vocabulary/Vocabulary.java | 27 ++++++++++++++----- .../oaf/utils/GraphCleaningFunctions.java | 2 +- .../dhp/oa/graph/clean/CleaningRuleMap.java | 2 +- .../dhp/oa/graph/clean/CleanContextTest.java | 8 +++--- .../dhp/oa/graph/clean/CleanCountryTest.java | 12 ++++----- .../clean/GraphCleaningFunctionsTest.java | 24 ++++++++++------- .../dnetlib/dhp/oa/graph/raw/MappersTest.java | 21 ++++++++++++--- .../eu/dnetlib/dhp/oa/graph/clean/result.json | 15 +++++------ 8 files changed, 71 insertions(+), 40 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary/Vocabulary.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary/Vocabulary.java index 3a8df5c9e2..2ab23bda6f 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary/Vocabulary.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary/Vocabulary.java @@ -4,6 +4,7 @@ package eu.dnetlib.dhp.common.vocabulary; import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Optional; import org.apache.commons.lang3.StringUtils; @@ -66,27 +67,39 @@ public class Vocabulary implements Serializable { } public Qualifier getTermAsQualifier(final String termId) { - if (StringUtils.isBlank(termId)) { + return getTermAsQualifier(termId, false); + } + + public Qualifier getTermAsQualifier(final String termId, boolean strict) { + final VocabularyTerm term = getTerm(termId); + if (Objects.nonNull(term)) { + return OafMapperUtils.qualifier(term.getId(), term.getName(), getId(), getName()); + } else if (Objects.isNull(term) && strict) { return OafMapperUtils.unknown(getId(), getName()); - } else if (termExists(termId)) { - final VocabularyTerm t = getTerm(termId); - return OafMapperUtils.qualifier(t.getId(), t.getName(), getId(), getName()); } else { return OafMapperUtils.qualifier(termId, termId, getId(), getName()); } } public Qualifier getSynonymAsQualifier(final String syn) { + return getSynonymAsQualifier(syn, false); + } + + public Qualifier getSynonymAsQualifier(final String syn, boolean strict) { return Optional .ofNullable(getTermBySynonym(syn)) - .map(term -> getTermAsQualifier(term.getId())) + .map(term -> getTermAsQualifier(term.getId(), strict)) .orElse(null); } public Qualifier lookup(String id) { + return lookup(id, false); + } + + public Qualifier lookup(String id, boolean strict) { return Optional - .ofNullable(getSynonymAsQualifier(id)) - .orElse(getTermAsQualifier(id)); + .ofNullable(getSynonymAsQualifier(id, strict)) + .orElse(getTermAsQualifier(id, strict)); } } diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java index 363f954234..347d3eb201 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java @@ -333,7 +333,7 @@ public class GraphCleaningFunctions extends CleaningFunctions { if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) { i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY); } - if (Objects.isNull(i.getRefereed())) { + if (Objects.isNull(i.getRefereed()) || StringUtils.isBlank(i.getRefereed().getClassid())) { i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS)); } if (Objects.nonNull(i.getDateofacceptance())) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java index 5f3b4e1caa..3d501bb275 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java @@ -42,7 +42,7 @@ public class CleaningRuleMap extends HashMap, SerializableConsumer { if (ModelConstants.DNET_SUBJECT_KEYWORD.equalsIgnoreCase(subject.getQualifier().getClassid())) { - Qualifier newValue = vocabulary.lookup(subject.getValue()); + Qualifier newValue = vocabulary.lookup(subject.getValue(), true); if (!ModelConstants.UNKNOWN.equals(newValue.getClassid())) { subject.setValue(newValue.getClassid()); subject.getQualifier().setClassid(vocabularyId); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextTest.java index e206c7c5a9..91094f5346 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextTest.java @@ -82,10 +82,10 @@ public class CleanContextTest { CleanContextSparkJob.main(new String[] { "--isSparkSessionManaged", Boolean.FALSE.toString(), "--inputPath", workingDir.toString() + "/publication", - "-graphTableClassName", Publication.class.getCanonicalName(), - "-workingPath", workingDir.toString() + "/working", - "-contextId", "sobigdata", - "-verifyParam", "gCube " + "--graphTableClassName", Publication.class.getCanonicalName(), + "--workingDir", workingDir.toString() + "/working", + "--contextId", "sobigdata", + "--verifyParam", "gCube " }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java index c9f8465708..caf19c21ce 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanCountryTest.java @@ -83,12 +83,12 @@ public class CleanCountryTest { CleanCountrySparkJob.main(new String[] { "--isSparkSessionManaged", Boolean.FALSE.toString(), "--inputPath", workingDir.toString() + "/publication", - "-graphTableClassName", Publication.class.getCanonicalName(), - "-workingPath", workingDir.toString() + "/working", - "-country", "NL", - "-verifyParam", "10.17632", - "-collectedfrom", "NARCIS", - "-hostedBy", getClass() + "--graphTableClassName", Publication.class.getCanonicalName(), + "--workingDir", workingDir.toString() + "/working", + "--country", "NL", + "--verifyParam", "10.17632", + "--collectedfrom", "NARCIS", + "--hostedBy", getClass() .getResource("/eu/dnetlib/dhp/oa/graph/clean/hostedBy") .getPath() }); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/GraphCleaningFunctionsTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/GraphCleaningFunctionsTest.java index 4035307e5a..fc7c6e5f1c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/GraphCleaningFunctionsTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/GraphCleaningFunctionsTest.java @@ -7,6 +7,7 @@ import static org.mockito.Mockito.lenient; import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -278,20 +279,25 @@ public class GraphCleaningFunctionsTest { s -> "0102 computer and information sciences".equals(s.getValue()) & ModelConstants.DNET_SUBJECT_FOS_CLASSID.equals(s.getQualifier().getClassid()))); - List s1 = p_cleaned - .getSubject() - .stream() - .filter(s -> s.getValue().equals("In Situ Hybridization")) - .collect(Collectors.toList()); - assertNotNull(s1); - assertEquals(1, s1.size()); - assertEquals(ModelConstants.DNET_SUBJECT_KEYWORD, s1.get(0).getQualifier().getClassid()); - assertEquals(ModelConstants.DNET_SUBJECT_KEYWORD, s1.get(0).getQualifier().getClassname()); + verify_keyword(p_cleaned, "In Situ Hybridization"); + verify_keyword(p_cleaned, "Avicennia"); // TODO add more assertions to verity the cleaned values System.out.println(MAPPER.writeValueAsString(p_cleaned)); } + private static void verify_keyword(Publication p_cleaned, String subject) { + Optional s1 = p_cleaned + .getSubject() + .stream() + .filter(s -> s.getValue().equals(subject)) + .findFirst(); + + assertTrue(s1.isPresent()); + assertEquals(ModelConstants.DNET_SUBJECT_KEYWORD, s1.get().getQualifier().getClassid()); + assertEquals(ModelConstants.DNET_SUBJECT_KEYWORD, s1.get().getQualifier().getClassname()); + } + private Stream getAuthorPidTypes(Result pub) { return pub .getAuthor() diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index ad6ceef54c..d085453887 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions; import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import eu.dnetlib.dhp.schema.oaf.utils.PidType; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -238,7 +239,11 @@ class MappersTest { assertNotNull(i.getAccessright()); assertEquals("OPEN", i.getAccessright().getClassid()); }); - assertEquals("UNKNOWN", p.getInstance().get(0).getRefereed().getClassid()); + + Publication p_cleaned = cleanup(p, vocs); + assertEquals("0000", p_cleaned.getInstance().get(0).getRefereed().getClassid()); + assertEquals("Unknown", p_cleaned.getInstance().get(0).getRefereed().getClassname()); + assertNotNull(p.getInstance().get(0).getPid()); assertEquals(2, p.getInstance().get(0).getPid().size()); @@ -453,7 +458,10 @@ class MappersTest { assertNotNull(i.getAccessright()); assertEquals("OPEN", i.getAccessright().getClassid()); }); - assertEquals("UNKNOWN", p.getInstance().get(0).getRefereed().getClassid()); + + Publication p_cleaned = cleanup(p, vocs); + assertEquals("0000", p_cleaned.getInstance().get(0).getRefereed().getClassid()); + assertEquals("Unknown", p_cleaned.getInstance().get(0).getRefereed().getClassname()); } @Test @@ -570,7 +578,9 @@ class MappersTest { assertTrue(i.getUrl().contains("http://apps.who.int/trialsearch/Trial3.aspx?trialid=NCT02321059")); assertTrue(i.getUrl().contains("https://clinicaltrials.gov/ct2/show/NCT02321059")); - assertEquals("UNKNOWN", i.getRefereed().getClassid()); + Dataset d_cleaned = cleanup(d, vocs); + assertEquals("0000", d_cleaned.getInstance().get(0).getRefereed().getClassid()); + assertEquals("Unknown", d_cleaned.getInstance().get(0).getRefereed().getClassname()); } @Test @@ -871,7 +881,10 @@ class MappersTest { assertNotNull(i.getAccessright()); assertEquals("UNKNOWN", i.getAccessright().getClassid()); }); - assertEquals("UNKNOWN", p.getInstance().get(0).getRefereed().getClassid()); + + Dataset p_cleaned = cleanup(p, vocs); + assertEquals("0000", p_cleaned.getInstance().get(0).getRefereed().getClassid()); + assertEquals("Unknown", p_cleaned.getInstance().get(0).getRefereed().getClassname()); } @Test diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json index 84ff35c086..8f35470e11 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json @@ -907,24 +907,23 @@ { "dataInfo": { "deletedbyinference": false, - "inferenceprovenance": "", "inferred": false, "invisible": false, "provenanceaction": { - "classid": "sysimport:crosswalk:datasetarchive", - "classname": "sysimport:crosswalk:datasetarchive", + "classid": "sysimport:actionset", + "classname": "Harvested", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions" }, "trust": "0.9" }, "qualifier": { - "classid": "", - "classname": "", - "schemeid": "", - "schemename": "" + "classid": "FOS", + "classname": "Fields of Science and Technology classification", + "schemeid": "dnet:subject_classification_typologies", + "schemename": "dnet:subject_classification_typologies" }, - "value": "doped silicon" + "value": "Avicennia" }, { "dataInfo": { From 7b80b24f8268b5283a56867feee02753f5fd516b Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 15 Dec 2022 14:49:04 +0100 Subject: [PATCH 6/6] [cleaning] country cleaning must use both PID and AlternateIdentifier fields --- .../clean/country/CleanCountrySparkJob.java | 44 ++++++++++++++++++- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java index d8d8034586..37e693de9a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/country/CleanCountrySparkJob.java @@ -4,9 +4,12 @@ package eu.dnetlib.dhp.oa.graph.clean.country; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; +import java.util.Collection; import java.util.List; import java.util.Optional; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.swing.text.html.Option; @@ -30,6 +33,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob; import eu.dnetlib.dhp.schema.oaf.Country; +import eu.dnetlib.dhp.schema.oaf.Instance; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.utils.PidType; @@ -110,8 +114,8 @@ public class CleanCountrySparkJob implements Serializable { return r; } - if (r - .getPid() + List ids = getPidsAndAltIds(r).collect(Collectors.toList()); + if (ids .stream() .anyMatch( p -> p @@ -148,6 +152,42 @@ public class CleanCountrySparkJob implements Serializable { .json(inputPath); } + private static Stream getPidsAndAltIds(T r) { + final Stream resultPids = Optional + .ofNullable(r.getPid()) + .map(Collection::stream) + .orElse(Stream.empty()); + + final Stream instancePids = Optional + .ofNullable(r.getInstance()) + .map( + instance -> instance + .stream() + .flatMap( + i -> Optional + .ofNullable(i.getPid()) + .map(Collection::stream) + .orElse(Stream.empty()))) + .orElse(Stream.empty()); + + final Stream instanceAltIds = Optional + .ofNullable(r.getInstance()) + .map( + instance -> instance + .stream() + .flatMap( + i -> Optional + .ofNullable(i.getAlternateIdentifier()) + .map(Collection::stream) + .orElse(Stream.empty()))) + .orElse(Stream.empty()); + + return Stream + .concat( + Stream.concat(resultPids, instancePids), + instanceAltIds); + } + private static boolean pidInParam(String value, String[] verifyParam) { for (String s : verifyParam) if (value.startsWith(s))