diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java index 6a9b21b003..1364133763 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java @@ -139,14 +139,28 @@ abstract class AbstractSparkAction implements Serializable { protected boolean isOpenorgs(Relation rel) { return Optional .ofNullable(rel.getCollectedfrom()) - .map( - c -> c - .stream() - .filter(Objects::nonNull) - .anyMatch(kv -> ModelConstants.OPENORGS_NAME.equals(kv.getValue()))) + .map(c -> isCollectedFromOpenOrgs(c)) .orElse(false); } + protected boolean isOpenorgsDedupRel(Relation rel) { + return isOpenorgs(rel) && isOpenOrgsDedupMergeRelation(rel); + } + + private boolean isCollectedFromOpenOrgs(List c) { + return c + .stream() + .filter(Objects::nonNull) + .anyMatch(kv -> ModelConstants.OPENORGS_NAME.equals(kv.getValue())); + } + + private boolean isOpenOrgsDedupMergeRelation(Relation rel) { + return ModelConstants.ORG_ORG_RELTYPE.equals(rel.getRelType()) && + ModelConstants.DEDUP.equals(rel.getSubRelType()) + && (ModelConstants.IS_MERGED_IN.equals(rel.getRelClass()) || + ModelConstants.MERGES.equals(rel.getRelClass())); + } + protected static Boolean parseECField(Field field) { if (field == null) return null; diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java index 9cc003bf69..62cbb5bff0 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java @@ -61,7 +61,7 @@ public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction { .textFile(relationPath) .map(patchRelFn(), Encoders.bean(Relation.class)) .toJavaRDD() - .filter(x -> !isOpenorgs(x)); + .filter(x -> !isOpenorgsDedupRel(x)); if (log.isDebugEnabled()) { log.debug("Number of non-Openorgs relations collected: {}", simRels.count()); diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java index 2349ffebe8..2a9f34deee 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsProvisionTest.java @@ -11,6 +11,8 @@ import java.io.IOException; import java.io.Serializable; import java.net.URISyntaxException; import java.nio.file.Paths; +import java.util.List; +import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -29,6 +31,8 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import com.fasterxml.jackson.databind.ObjectMapper; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; @@ -226,9 +230,10 @@ public class SparkOpenorgsProvisionTest implements Serializable { new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService); - long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); + final JavaRDD rels = jsc.textFile(testDedupGraphBasePath + "/relation"); + + assertEquals(2382, rels.count()); - assertEquals(2380, relations); } @Test @@ -250,7 +255,7 @@ public class SparkOpenorgsProvisionTest implements Serializable { long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); - assertEquals(4894, relations); + assertEquals(4896, relations); // check deletedbyinference final Dataset mergeRels = spark diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/provision/relation/part-00000 b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/provision/relation/part-00000 index 67d491ca2f..35d92089dd 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/provision/relation/part-00000 +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/openorgs/provision/relation/part-00000 @@ -2518,3 +2518,5 @@ {"subRelType": "dedup", "relClass": "isMergedIn", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|openorgs____::5c351d85f02db01ca291acd119f0bd78", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|opendoar____::37248e2f6987b18670dd2b8a51d6ef55", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []} {"subRelType": "dedup", "relClass": "merges", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|corda_______::6acb33e6ea8c6fcdabc891c80d083c64", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|openorgs____::e38c1a27fcb0f0ab218828e4f5fc7be9", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []} {"subRelType": "dedup", "relClass": "isMergedIn", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|openorgs____::e38c1a27fcb0f0ab218828e4f5fc7be9", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|corda_______::6acb33e6ea8c6fcdabc891c80d083c64", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []} +{"subRelType": "relationship", "relClass": "IsParentOf", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|openorgs____::e38c1a27fcb0f0ab218828e4f5fc7be9", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|corda_______::6acb33e6ea8c6fcdabc891c80d083c64", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []} +{"subRelType": "relationship", "relClass": "IsChildOf", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|corda_______::6acb33e6ea8c6fcdabc891c80d083c64", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|openorgs____::e38c1a27fcb0f0ab218828e4f5fc7be9", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []}