1
0
Fork 0

fixed filtering criteria applied in SparkCopyRelationsNoOpenorgs to keep the parent/child relations from OpenOrgs

This commit is contained in:
Claudio Atzori 2021-11-19 11:30:29 +01:00
parent a24b9f8268
commit 2b46b87f56
4 changed files with 43 additions and 9 deletions

View File

@ -139,12 +139,26 @@ abstract class AbstractSparkAction implements Serializable {
protected boolean isOpenorgs(Relation rel) {
return Optional
.ofNullable(rel.getCollectedfrom())
.map(
c -> c
.map(c -> isCollectedFromOpenOrgs(c))
.orElse(false);
}
protected boolean isOpenorgsDedupRel(Relation rel) {
return isOpenorgs(rel) && isOpenOrgsDedupMergeRelation(rel);
}
private boolean isCollectedFromOpenOrgs(List<KeyValue> c) {
return c
.stream()
.filter(Objects::nonNull)
.anyMatch(kv -> ModelConstants.OPENORGS_NAME.equals(kv.getValue())))
.orElse(false);
.anyMatch(kv -> ModelConstants.OPENORGS_NAME.equals(kv.getValue()));
}
private boolean isOpenOrgsDedupMergeRelation(Relation rel) {
return ModelConstants.ORG_ORG_RELTYPE.equals(rel.getRelType()) &&
ModelConstants.DEDUP.equals(rel.getSubRelType())
&& (ModelConstants.IS_MERGED_IN.equals(rel.getRelClass()) ||
ModelConstants.MERGES.equals(rel.getRelClass()));
}
protected static Boolean parseECField(Field<String> field) {

View File

@ -61,7 +61,7 @@ public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction {
.textFile(relationPath)
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD()
.filter(x -> !isOpenorgs(x));
.filter(x -> !isOpenorgsDedupRel(x));
if (log.isDebugEnabled()) {
log.debug("Number of non-Openorgs relations collected: {}", simRels.count());

View File

@ -11,6 +11,8 @@ import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
@ -29,6 +31,8 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
@ -226,9 +230,23 @@ public class SparkOpenorgsProvisionTest implements Serializable {
new SparkCopyRelationsNoOpenorgs(parser, spark).run(isLookUpService);
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
final JavaRDD<String> rels = jsc.textFile(testDedupGraphBasePath + "/relation");
long relations = rels.count();
final ObjectMapper mapper = new ObjectMapper();
List<String> relTypes = rels
.map(r -> mapper.readValue(r, Relation.class))
.map(
r -> r.getRelType() + "_" + r.getSubRelType() + "_" + r.getRelClass() + "|" +
r.getCollectedfrom().stream().map(cf -> cf.getValue()).collect(Collectors.joining(",")))
.distinct()
.collect();
relTypes.forEach(r -> System.out.println("relType: " + r));
assertEquals(2382, relations);
assertEquals(2380, relations);
}
@Test
@ -250,7 +268,7 @@ public class SparkOpenorgsProvisionTest implements Serializable {
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
assertEquals(4894, relations);
assertEquals(4896, relations);
// check deletedbyinference
final Dataset<Relation> mergeRels = spark

View File

@ -2518,3 +2518,5 @@
{"subRelType": "dedup", "relClass": "isMergedIn", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|openorgs____::5c351d85f02db01ca291acd119f0bd78", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|opendoar____::37248e2f6987b18670dd2b8a51d6ef55", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []}
{"subRelType": "dedup", "relClass": "merges", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|corda_______::6acb33e6ea8c6fcdabc891c80d083c64", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|openorgs____::e38c1a27fcb0f0ab218828e4f5fc7be9", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []}
{"subRelType": "dedup", "relClass": "isMergedIn", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|openorgs____::e38c1a27fcb0f0ab218828e4f5fc7be9", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|corda_______::6acb33e6ea8c6fcdabc891c80d083c64", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []}
{"subRelType": "relationship", "relClass": "IsParentOf", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|openorgs____::e38c1a27fcb0f0ab218828e4f5fc7be9", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|corda_______::6acb33e6ea8c6fcdabc891c80d083c64", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []}
{"subRelType": "relationship", "relClass": "IsChildOf", "dataInfo": {"provenanceaction": {"classid": "sysimport:crosswalk:entityregistry", "classname": "sysimport:crosswalk:entityregistry", "schemeid": "dnet:provenanceActions", "schemename": "dnet:provenanceActions"}, "deletedbyinference": false, "inferred": false, "inferenceprovenance": "", "invisible": false, "trust": "0.990"}, "target": "20|corda_______::6acb33e6ea8c6fcdabc891c80d083c64", "lastupdatetimestamp": 1617801137807, "relType": "organizationOrganization", "source": "20|openorgs____::e38c1a27fcb0f0ab218828e4f5fc7be9", "validationDate": null, "collectedfrom": [{"dataInfo": null, "key": "10|openaire____::0362fcdb3076765d9c0041ad331553e8", "value": "OpenOrgs Database"}], "validated": false, "properties": []}