From 2b46b87f56a209c5ca91803920121b96a5535306 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 19 Nov 2021 11:30:29 +0100 Subject: [PATCH 1/2] fixed filtering criteria applied in SparkCopyRelationsNoOpenorgs to keep the parent/child relations from OpenOrgs --- .../dhp/oa/dedup/AbstractSparkAction.java | 24 +++++++++++++++---- .../dedup/SparkCopyRelationsNoOpenorgs.java | 2 +- .../oa/dedup/SparkOpenorgsProvisionTest.java | 24 ++++++++++++++++--- .../openorgs/provision/relation/part-00000 | 2 ++ 4 files changed, 43 insertions(+), 9 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java index 6a9b21b003..1364133763 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java @@ -139,14 +139,28 @@ abstract class AbstractSparkAction implements Serializable { protected boolean isOpenorgs(Relation rel) { return Optional .ofNullable(rel.getCollectedfrom()) - .map( - c -> c - .stream() - .filter(Objects::nonNull) - .anyMatch(kv -> ModelConstants.OPENORGS_NAME.equals(kv.getValue()))) + .map(c -> isCollectedFromOpenOrgs(c)) .orElse(false); } + protected boolean isOpenorgsDedupRel(Relation rel) { + return isOpenorgs(rel) && isOpenOrgsDedupMergeRelation(rel); + } + + private boolean isCollectedFromOpenOrgs(List c) { + return c + .stream() + .filter(Objects::nonNull) + .anyMatch(kv -> ModelConstants.OPENORGS_NAME.equals(kv.getValue())); + } + + private boolean isOpenOrgsDedupMergeRelation(Relation rel) { + return ModelConstants.ORG_ORG_RELTYPE.equals(rel.getRelType()) && + ModelConstants.DEDUP.equals(rel.getSubRelType()) + && (ModelConstants.IS_MERGED_IN.equals(rel.getRelClass()) || + ModelConstants.MERGES.equals(rel.getRelClass())); + } + protected static Boolean parseECField(Field field) { if (field == null) return null; diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java index 9cc003bf69..62cbb5bff0 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java @@ -61,7 +61,7 @@ public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction { .textFile(relationPath) .map(patchRelFn(), Encoders.bean(Relation.class)) .toJavaRDD() - .filter(x -> !isOpenorgs(x)); + .filter(x -> !isOpenorgsDedupRel(x)); if (log.isDebugEnabled()) { log.debug("Number of non-Openorgs relations collected: {}", simRels.count()); diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java index 2349ffebe8..3cd695524e 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java @@ -11,6 +11,8 @@ import java.io.IOException; import java.io.Serializable; import java.net.URISyntaxException; import java.nio.file.Paths; +import java.util.List; +import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -29,6 +31,8 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import com.fasterxml.jackson.databind.ObjectMapper; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; @@ -226,9 +230,23 @@ public class SparkOpenorgsProvisionTest implements Serializable { new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService); - long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); + final JavaRDD rels = jsc.textFile(testDedupGraphBasePath + "/relation"); + + long relations = rels.count(); + + final ObjectMapper mapper = new ObjectMapper(); + List relTypes = rels + .map(r -> mapper.readValue(r, Relation.class)) + .map( + r -> r.getRelType() + "_" + r.getSubRelType() + "_" + r.getRelClass() + "|" + + r.getCollectedfrom().stream().map(cf -> cf.getValue()).collect(Collectors.joining(","))) + .distinct() + .collect(); + + relTypes.forEach(r -> System.out.println("relType: " + r)); + + assertEquals(2382, relations); - assertEquals(2380, relations); } @Test @@ -250,7 +268,7 @@ public class SparkOpenorgsProvisionTest implements Serializable { long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); - assertEquals(4894, relations); + assertEquals(4896, relations); // check deletedbyinference final Dataset mergeRels = spark diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/provision/relation/part-00000 b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/provision/relation/part-00000 index 67d491ca2f..35d92089dd 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/provision/relation/part-00000 +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/provision/relation/part-00000 @@ -2518,3 +2518,5 @@ {"subRelType": "dedup", "relClass": "isMergedIn", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|openorgs____::5c351d85f02db01ca291acd119f0bd78", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|opendoar____::37248e2f6987b18670dd2b8a51d6ef55", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []} {"subRelType": "dedup", "relClass": "merges", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|corda_______::6acb33e6ea8c6fcdabc891c80d083c64", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|openorgs____::e38c1a27fcb0f0ab218828e4f5fc7be9", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []} {"subRelType": "dedup", "relClass": "isMergedIn", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|openorgs____::e38c1a27fcb0f0ab218828e4f5fc7be9", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|corda_______::6acb33e6ea8c6fcdabc891c80d083c64", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []} +{"subRelType": "relationship", "relClass": "IsParentOf", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|openorgs____::e38c1a27fcb0f0ab218828e4f5fc7be9", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|corda_______::6acb33e6ea8c6fcdabc891c80d083c64", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []} +{"subRelType": "relationship", "relClass": "IsChildOf", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|corda_______::6acb33e6ea8c6fcdabc891c80d083c64", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|openorgs____::e38c1a27fcb0f0ab218828e4f5fc7be9", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []} From f4538f3c4c29df211b9c8eb5f2560620677ff642 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 19 Nov 2021 11:33:10 +0100 Subject: [PATCH 2/2] cleanup --- .../dhp/oa/dedup/SparkOpenorgsProvisionTest.java | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java index 3cd695524e..2a9f34deee 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java @@ -232,20 +232,7 @@ public class SparkOpenorgsProvisionTest implements Serializable { final JavaRDD rels = jsc.textFile(testDedupGraphBasePath + "/relation"); - long relations = rels.count(); - - final ObjectMapper mapper = new ObjectMapper(); - List relTypes = rels - .map(r -> mapper.readValue(r, Relation.class)) - .map( - r -> r.getRelType() + "_" + r.getSubRelType() + "_" + r.getRelClass() + "|" + - r.getCollectedfrom().stream().map(cf -> cf.getValue()).collect(Collectors.joining(","))) - .distinct() - .collect(); - - relTypes.forEach(r -> System.out.println("relType: " + r)); - - assertEquals(2382, relations); + assertEquals(2382, rels.count()); }