From e3f7798d1bb7ee367cba0ed25d613c05943a880f Mon Sep 17 00:00:00 2001 From: miconis Date: Tue, 29 Sep 2020 15:31:46 +0200 Subject: [PATCH] minor changes in dedup tests, bug fix in the idgenerator and pace-core version update --- dhp-workflows/dhp-dedup-openaire/pom.xml | 4 + .../dhp/oa/dedup/AbstractSparkAction.java | 1 + .../dhp/oa/dedup/DedupRecordFactory.java | 27 +- .../eu/dnetlib/dhp/oa/dedup/IdGenerator.java | 146 +++++---- .../eu/dnetlib/dhp/oa/dedup/Identifier.java | 208 +++++++------ .../eu/dnetlib/dhp/oa/dedup/OrgSimRel.java | 135 ++++---- .../java/eu/dnetlib/dhp/oa/dedup/PidType.java | 30 +- .../dhp/oa/dedup/SparkCollectSimRels.java | 288 +++++++++--------- .../dhp/oa/dedup/SparkCreateMergeRels.java | 14 +- .../dhp/oa/dedup/SparkCreateSimRels.java | 22 +- .../dhp/oa/dedup/SparkPrepareOrgRels.java | 274 +++++++++-------- .../oa/dedup/orgsdedup/oozie_app/workflow.xml | 27 +- .../oa/dedup/prepareOrgRels_parameters.json | 8 +- .../dhp/oa/dedup/EntityMergerTest.java | 8 +- .../dnetlib/dhp/oa/dedup/SparkDedupTest.java | 226 +++++++------- pom.xml | 2 +- 16 files changed, 751 insertions(+), 669 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/pom.xml b/dhp-workflows/dhp-dedup-openaire/pom.xml index 03ddbcf4c8..ff11c66e05 100644 --- a/dhp-workflows/dhp-dedup-openaire/pom.xml +++ b/dhp-workflows/dhp-dedup-openaire/pom.xml @@ -90,6 +90,10 @@ com.fasterxml.jackson.core jackson-core + + org.apache.httpcomponents + httpclient + diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java index 74cecb7b6b..9a11277645 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java @@ -29,6 +29,7 @@ import eu.dnetlib.pace.config.DedupConfig; abstract class AbstractSparkAction implements Serializable { protected static final int NUM_PARTITIONS = 1000; + protected static final int NUM_CONNECTIONS = 20; protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index 0fc393ea5f..50dda887b6 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -1,12 +1,10 @@ package eu.dnetlib.dhp.oa.dedup; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import eu.dnetlib.dhp.schema.common.EntityType; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.*; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.*; + import org.apache.commons.lang.StringUtils; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; @@ -15,11 +13,15 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Tuple2; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.*; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; + +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import scala.Tuple2; public class DedupRecordFactory { @@ -80,14 +82,14 @@ public class DedupRecordFactory { final Collection dates = Lists.newArrayList(); final List> authors = Lists.newArrayList(); - final List bestPids = Lists.newArrayList(); //best pids list + final List bestPids = Lists.newArrayList(); // best pids list entities .forEachRemaining( t -> { T duplicate = t._2(); - //prepare the list of pids to use for the id generation + // prepare the list of pids to use for the id generation bestPids.addAll(IdGenerator.bestPidtoIdentifier(duplicate)); entity.mergeFrom(duplicate); @@ -115,5 +117,4 @@ public class DedupRecordFactory { return entity; } - } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java index 2d203a1b19..2916e063d2 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java @@ -1,90 +1,112 @@ -package eu.dnetlib.dhp.oa.dedup; -import com.google.common.collect.Lists; -import eu.dnetlib.dhp.schema.common.EntityType; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Field; -import eu.dnetlib.dhp.schema.oaf.OafEntity; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -import org.apache.commons.lang.NullArgumentException; -import org.apache.commons.lang.StringUtils; +package eu.dnetlib.dhp.oa.dedup; import java.io.Serializable; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; +import org.apache.commons.lang.NullArgumentException; +import org.apache.commons.lang.StringUtils; + +import com.google.common.collect.Lists; + +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; + public class IdGenerator implements Serializable { - private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); - public static String CROSSREF_ID = "10|openaire____::081b82f96300b6a6e3d282bad31cb6e2"; - public static String DATACITE_ID = "10|openaire____::9e3be59865b2c1c335d32dae2fe7b254"; + public static String CROSSREF_ID = "10|openaire____::081b82f96300b6a6e3d282bad31cb6e2"; + public static String DATACITE_ID = "10|openaire____::9e3be59865b2c1c335d32dae2fe7b254"; - //pick the best pid from the list (consider date and pidtype) - public static String generate(List pids, String defaultID) { - if (pids == null || pids.size() == 0) - return defaultID; + // pick the best pid from the list (consider date and pidtype) + public static String generate(List pids, String defaultID) { + if (pids == null || pids.size() == 0) + return defaultID; - Optional bp = pids.stream() - .max(Identifier::compareTo); + Optional bp = pids + .stream() + .max(Identifier::compareTo); - if (bp.get().isUseOriginal() || bp.get().getPid().getValue() == null) { - return bp.get().getOriginalID().split("\\|")[0] + "|dedup_wf_001::" + DedupUtility.md5(bp.get().getOriginalID()); - } else { - return bp.get().getOriginalID().split("\\|")[0] + "|" + createPrefix(bp.get().getPid().getQualifier().getClassid()) + "::" + DedupUtility.md5(bp.get().getPid().getValue()); - } + if (bp.get().isUseOriginal() || bp.get().getPid().getValue() == null) { + return bp.get().getOriginalID().split("\\|")[0] + "|dedup_wf_001::" + + DedupUtility.md5(bp.get().getOriginalID()); + } else { + return bp.get().getOriginalID().split("\\|")[0] + "|" + + createPrefix(bp.get().getPid().getQualifier().getClassid()) + "::" + + DedupUtility.md5(bp.get().getPid().getValue()); + } - } + } - //pick the best pid from the entity. Returns a list (length 1) to save time in the call - public static List bestPidtoIdentifier(T entity) { + // pick the best pid from the entity. Returns a list (length 1) to save time in the call + public static List bestPidtoIdentifier(T entity) { - if (entity.getPid() == null || entity.getPid().size() == 0) - return Lists.newArrayList(new Identifier(new StructuredProperty(), new Date(), PidType.original, entity.getCollectedfrom(), EntityType.fromClass(entity.getClass()), entity.getId())); + if (entity.getPid() == null || entity.getPid().size() == 0) + return Lists + .newArrayList( + new Identifier(new StructuredProperty(), new Date(), PidType.original, entity.getCollectedfrom(), + EntityType.fromClass(entity.getClass()), entity.getId())); - Optional bp = entity.getPid().stream() - .filter(pid -> PidType.classidValueOf(pid.getQualifier().getClassid()) != PidType.undefined) - .max(Comparator.comparing(pid -> PidType.classidValueOf(pid.getQualifier().getClassid()))); + Optional bp = entity + .getPid() + .stream() + .filter(pid -> PidType.classidValueOf(pid.getQualifier().getClassid()) != PidType.undefined) + .max(Comparator.comparing(pid -> PidType.classidValueOf(pid.getQualifier().getClassid()))); - return bp.map(structuredProperty -> - Lists.newArrayList(new Identifier(structuredProperty, extractDate(entity, sdf), PidType.classidValueOf(structuredProperty.getQualifier().getClassid()), entity.getCollectedfrom(), EntityType.fromClass(entity.getClass()), entity.getId())) - ).orElseGet(() -> Lists.newArrayList(new Identifier(new StructuredProperty(), new Date(), PidType.original, entity.getCollectedfrom(), EntityType.fromClass(entity.getClass()), entity.getId()))); + return bp + .map( + structuredProperty -> Lists + .newArrayList( + new Identifier(structuredProperty, extractDate(entity, new SimpleDateFormat("yyyy-MM-dd")), + PidType.classidValueOf(structuredProperty.getQualifier().getClassid()), + entity.getCollectedfrom(), EntityType.fromClass(entity.getClass()), entity.getId()))) + .orElseGet( + () -> Lists + .newArrayList( + new Identifier(new StructuredProperty(), new Date(), PidType.original, + entity.getCollectedfrom(), EntityType.fromClass(entity.getClass()), entity.getId()))); - } + } - //create the prefix (length = 12): dedup_+ pidType - public static String createPrefix(String pidType) { + // create the prefix (length = 12): dedup_+ pidType + public static String createPrefix(String pidType) { - StringBuilder prefix = new StringBuilder("dedup_" + pidType); + StringBuilder prefix = new StringBuilder("dedup_" + pidType); - while (prefix.length() < 12) { - prefix.append("_"); - } - return prefix.toString().substring(0, 12); + while (prefix.length() < 12) { + prefix.append("_"); + } + return prefix.toString().substring(0, 12); - } + } - //extracts the date from the record. If the date is not available or is not wellformed, it returns a base date: 00-01-01 - public static Date extractDate(T duplicate, SimpleDateFormat sdf){ + // extracts the date from the record. If the date is not available or is not wellformed, it returns a base date: + // 00-01-01 + public static Date extractDate(T duplicate, SimpleDateFormat sdf) { - String date = "2000-01-01"; - if (ModelSupport.isSubClass(duplicate, Result.class)) { - Result result = (Result) duplicate; - if (isWellformed(result.getDateofacceptance())){ - date = result.getDateofacceptance().getValue(); - } - } + String date = "2000-01-01"; + if (ModelSupport.isSubClass(duplicate, Result.class)) { + Result result = (Result) duplicate; + if (isWellformed(result.getDateofacceptance())) { + date = result.getDateofacceptance().getValue(); + } + } - try { - return sdf.parse(date); - } catch (ParseException e) { - return new Date(); - } + try { + return sdf.parse(date); + } catch (ParseException e) { + return new Date(); + } - } + } - public static boolean isWellformed(Field date) { - return date != null && StringUtils.isNotBlank(date.getValue()) && date.getValue().matches(DatePicker.DATE_PATTERN) && DatePicker.inRange(date.getValue()); - } + public static boolean isWellformed(Field date) { + return date != null && StringUtils.isNotBlank(date.getValue()) + && date.getValue().matches(DatePicker.DATE_PATTERN) && DatePicker.inRange(date.getValue()); + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Identifier.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Identifier.java index fd52d20f97..480b523412 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Identifier.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Identifier.java @@ -1,132 +1,138 @@ -package eu.dnetlib.dhp.oa.dedup; -import eu.dnetlib.dhp.schema.common.EntityType; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +package eu.dnetlib.dhp.oa.dedup; import java.io.Serializable; import java.util.Date; import java.util.List; -public class Identifier implements Serializable, Comparable{ +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; - StructuredProperty pid; - Date date; - PidType type; - List collectedFrom; - EntityType entityType; - String originalID; +public class Identifier implements Serializable, Comparable { - boolean useOriginal = false; //to know if the top identifier won because of the alphabetical order of the original ID + StructuredProperty pid; + Date date; + PidType type; + List collectedFrom; + EntityType entityType; + String originalID; - public Identifier(StructuredProperty pid, Date date, PidType type, List collectedFrom, EntityType entityType, String originalID) { - this.pid = pid; - this.date = date; - this.type = type; - this.collectedFrom = collectedFrom; - this.entityType = entityType; - this.originalID = originalID; - } + boolean useOriginal = false; // to know if the top identifier won because of the alphabetical order of the original + // ID - public StructuredProperty getPid() { - return pid; - } + public Identifier(StructuredProperty pid, Date date, PidType type, List collectedFrom, + EntityType entityType, String originalID) { + this.pid = pid; + this.date = date; + this.type = type; + this.collectedFrom = collectedFrom; + this.entityType = entityType; + this.originalID = originalID; + } - public void setPid(StructuredProperty pidValue) { - this.pid = pid; - } + public StructuredProperty getPid() { + return pid; + } - public Date getDate() { - return date; - } + public void setPid(StructuredProperty pidValue) { + this.pid = pid; + } - public void setDate(Date date) { - this.date = date; - } + public Date getDate() { + return date; + } - public PidType getType() { - return type; - } + public void setDate(Date date) { + this.date = date; + } - public void setType(PidType type) { - this.type = type; - } + public PidType getType() { + return type; + } - public List getCollectedFrom() { - return collectedFrom; - } + public void setType(PidType type) { + this.type = type; + } - public void setCollectedFrom(List collectedFrom) { - this.collectedFrom = collectedFrom; - } + public List getCollectedFrom() { + return collectedFrom; + } - public EntityType getEntityType() { - return entityType; - } + public void setCollectedFrom(List collectedFrom) { + this.collectedFrom = collectedFrom; + } - public void setEntityType(EntityType entityType) { - this.entityType = entityType; - } + public EntityType getEntityType() { + return entityType; + } - public String getOriginalID() { - return originalID; - } + public void setEntityType(EntityType entityType) { + this.entityType = entityType; + } - public void setOriginalID(String originalID) { - this.originalID = originalID; - } + public String getOriginalID() { + return originalID; + } - public boolean isUseOriginal() { - return useOriginal; - } + public void setOriginalID(String originalID) { + this.originalID = originalID; + } - public void setUseOriginal(boolean useOriginal) { - this.useOriginal = useOriginal; - } + public boolean isUseOriginal() { + return useOriginal; + } - @Override - public int compareTo(Identifier i) { - //priority in comparisons: 1) pidtype, 2) collectedfrom (depending on the entity type) , 3) date 4) alphabetical order of the originalID - if (this.getType().compareTo(i.getType()) == 0){ //same type - if (entityType == EntityType.publication) { - if (isFromDatasourceID(this.collectedFrom, IdGenerator.CROSSREF_ID) && !isFromDatasourceID(i.collectedFrom, IdGenerator.CROSSREF_ID)) - return 1; - if (isFromDatasourceID(i.collectedFrom, IdGenerator.CROSSREF_ID) && !isFromDatasourceID(this.collectedFrom, IdGenerator.CROSSREF_ID)) - return -1; - } - if (entityType == EntityType.dataset) { - if (isFromDatasourceID(this.collectedFrom, IdGenerator.DATACITE_ID) && !isFromDatasourceID(i.collectedFrom, IdGenerator.DATACITE_ID)) - return 1; - if (isFromDatasourceID(i.collectedFrom, IdGenerator.DATACITE_ID) && !isFromDatasourceID(this.collectedFrom, IdGenerator.DATACITE_ID)) - return -1; - } + public void setUseOriginal(boolean useOriginal) { + this.useOriginal = useOriginal; + } - if (this.getDate().compareTo(date) == 0) {//same date + @Override + public int compareTo(Identifier i) { + // priority in comparisons: 1) pidtype, 2) collectedfrom (depending on the entity type) , 3) date 4) + // alphabetical order of the originalID + if (this.getType().compareTo(i.getType()) == 0) { // same type + if (entityType == EntityType.publication) { + if (isFromDatasourceID(this.collectedFrom, IdGenerator.CROSSREF_ID) + && !isFromDatasourceID(i.collectedFrom, IdGenerator.CROSSREF_ID)) + return 1; + if (isFromDatasourceID(i.collectedFrom, IdGenerator.CROSSREF_ID) + && !isFromDatasourceID(this.collectedFrom, IdGenerator.CROSSREF_ID)) + return -1; + } + if (entityType == EntityType.dataset) { + if (isFromDatasourceID(this.collectedFrom, IdGenerator.DATACITE_ID) + && !isFromDatasourceID(i.collectedFrom, IdGenerator.DATACITE_ID)) + return 1; + if (isFromDatasourceID(i.collectedFrom, IdGenerator.DATACITE_ID) + && !isFromDatasourceID(this.collectedFrom, IdGenerator.DATACITE_ID)) + return -1; + } - if (this.originalID.compareTo(i.originalID) > 0) - this.useOriginal = true; - else - i.setUseOriginal(true); + if (this.getDate().compareTo(date) == 0) {// same date - //the minus because we need to take the alphabetically lower id - return -this.originalID.compareTo(i.originalID); - } - else - //the minus is because we need to take the elder date - return -this.getDate().compareTo(date); - } - else { - return this.getType().compareTo(i.getType()); - } + if (this.originalID.compareTo(i.originalID) > 0) + this.useOriginal = true; + else + i.setUseOriginal(true); - } + // the minus because we need to take the alphabetically lower id + return -this.originalID.compareTo(i.originalID); + } else + // the minus is because we need to take the elder date + return -this.getDate().compareTo(date); + } else { + return this.getType().compareTo(i.getType()); + } - public boolean isFromDatasourceID(List collectedFrom, String dsId){ + } - for(KeyValue cf: collectedFrom) { - if(cf.getKey().equals(dsId)) - return true; - } - return false; - } + public boolean isFromDatasourceID(List collectedFrom, String dsId) { + + for (KeyValue cf : collectedFrom) { + if (cf.getKey().equals(dsId)) + return true; + } + return false; + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OrgSimRel.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OrgSimRel.java index a7d8ead0b3..84dfecd624 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OrgSimRel.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OrgSimRel.java @@ -1,83 +1,98 @@ + package eu.dnetlib.dhp.oa.dedup; import java.io.Serializable; public class OrgSimRel implements Serializable { - String local_id; - String oa_original_id; - String oa_name; - String oa_acronym; - String oa_country; - String oa_url; - String oa_collectedfrom; + String local_id; + String oa_original_id; + String oa_name; + String oa_acronym; + String oa_country; + String oa_url; + String oa_collectedfrom; - public OrgSimRel() { - } + public OrgSimRel() { + } - public OrgSimRel(String local_id, String oa_original_id, String oa_name, String oa_acronym, String oa_country, String oa_url, String oa_collectedfrom) { - this.local_id = local_id; - this.oa_original_id = oa_original_id; - this.oa_name = oa_name; - this.oa_acronym = oa_acronym; - this.oa_country = oa_country; - this.oa_url = oa_url; - this.oa_collectedfrom = oa_collectedfrom; - } + public OrgSimRel(String local_id, String oa_original_id, String oa_name, String oa_acronym, String oa_country, + String oa_url, String oa_collectedfrom) { + this.local_id = local_id; + this.oa_original_id = oa_original_id; + this.oa_name = oa_name; + this.oa_acronym = oa_acronym; + this.oa_country = oa_country; + this.oa_url = oa_url; + this.oa_collectedfrom = oa_collectedfrom; + } - public String getLocal_id() { - return local_id; - } + public String getLocal_id() { + return local_id; + } - public void setLocal_id(String local_id) { - this.local_id = local_id; - } + public void setLocal_id(String local_id) { + this.local_id = local_id; + } - public String getOa_original_id() { - return oa_original_id; - } + public String getOa_original_id() { + return oa_original_id; + } - public void setOa_original_id(String oa_original_id) { - this.oa_original_id = oa_original_id; - } + public void setOa_original_id(String oa_original_id) { + this.oa_original_id = oa_original_id; + } - public String getOa_name() { - return oa_name; - } + public String getOa_name() { + return oa_name; + } - public void setOa_name(String oa_name) { - this.oa_name = oa_name; - } + public void setOa_name(String oa_name) { + this.oa_name = oa_name; + } - public String getOa_acronym() { - return oa_acronym; - } + public String getOa_acronym() { + return oa_acronym; + } - public void setOa_acronym(String oa_acronym) { - this.oa_acronym = oa_acronym; - } + public void setOa_acronym(String oa_acronym) { + this.oa_acronym = oa_acronym; + } - public String getOa_country() { - return oa_country; - } + public String getOa_country() { + return oa_country; + } - public void setOa_country(String oa_country) { - this.oa_country = oa_country; - } + public void setOa_country(String oa_country) { + this.oa_country = oa_country; + } - public String getOa_url() { - return oa_url; - } + public String getOa_url() { + return oa_url; + } - public void setOa_url(String oa_url) { - this.oa_url = oa_url; - } + public void setOa_url(String oa_url) { + this.oa_url = oa_url; + } - public String getOa_collectedfrom() { - return oa_collectedfrom; - } + public String getOa_collectedfrom() { + return oa_collectedfrom; + } - public void setOa_collectedfrom(String oa_collectedfrom) { - this.oa_collectedfrom = oa_collectedfrom; - } + public void setOa_collectedfrom(String oa_collectedfrom) { + this.oa_collectedfrom = oa_collectedfrom; + } + + @Override + public String toString() { + return "OrgSimRel{" + + "local_id='" + local_id + '\'' + + ", oa_original_id='" + oa_original_id + '\'' + + ", oa_name='" + oa_name + '\'' + + ", oa_acronym='" + oa_acronym + '\'' + + ", oa_country='" + oa_country + '\'' + + ", oa_url='" + oa_url + '\'' + + ", oa_collectedfrom='" + oa_collectedfrom + '\'' + + '}'; + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/PidType.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/PidType.java index ab5c498680..c3241bac64 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/PidType.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/PidType.java @@ -1,25 +1,17 @@ + package eu.dnetlib.dhp.oa.dedup; public enum PidType { - //from the less to the more important - undefined, - original, - orcid, - ror, - grid, - pdb, - arXiv, - pmid, - doi; + // from the less to the more important + undefined, original, orcid, ror, grid, pdb, arXiv, pmid, doi; - public static PidType classidValueOf(String s){ - try { - return PidType.valueOf(s); - } - catch (Exception e) { - return PidType.undefined; - } - } + public static PidType classidValueOf(String s) { + try { + return PidType.valueOf(s); + } catch (Exception e) { + return PidType.undefined; + } + } -} \ No newline at end of file +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java index b35f9c54ad..f9e6448b03 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java @@ -1,21 +1,5 @@ -package eu.dnetlib.dhp.oa.dedup; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.config.DedupConfig; -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.sql.*; -import org.dom4j.DocumentException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Tuple2; +package eu.dnetlib.dhp.oa.dedup; import java.io.IOException; import java.util.ArrayList; @@ -24,153 +8,177 @@ import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.*; +import org.dom4j.DocumentException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.config.DedupConfig; +import scala.Tuple2; + public class SparkCollectSimRels extends AbstractSparkAction { - private static final Logger log = LoggerFactory.getLogger(SparkCollectSimRels.class); + private static final Logger log = LoggerFactory.getLogger(SparkCollectSimRels.class); - Dataset simGroupsDS; - Dataset groupsDS; + Dataset simGroupsDS; + Dataset groupsDS; - public SparkCollectSimRels(ArgumentApplicationParser parser, SparkSession spark, Dataset simGroupsDS, Dataset groupsDS) { - super(parser, spark); - this.simGroupsDS = simGroupsDS; - this.groupsDS = groupsDS; - } + public SparkCollectSimRels(ArgumentApplicationParser parser, SparkSession spark, Dataset simGroupsDS, + Dataset groupsDS) { + super(parser, spark); + this.simGroupsDS = simGroupsDS; + this.groupsDS = groupsDS; + } - public static void main(String[] args) throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkBlockStats.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json"))); - parser.parseArgument(args); + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkBlockStats.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json"))); + parser.parseArgument(args); - SparkConf conf = new SparkConf(); + SparkConf conf = new SparkConf(); - final String dbUrl = parser.get("postgresUrl"); - final String dbUser = parser.get("postgresUser"); - final String dbPassword = parser.get("postgresPassword"); + final String dbUrl = parser.get("postgresUrl"); + final String dbUser = parser.get("postgresUser"); + final String dbPassword = parser.get("postgresPassword"); - SparkSession spark = getSparkSession(conf); + SparkSession spark = getSparkSession(conf); - DataFrameReader readOptions = spark.read() - .format("jdbc") - .option("url", dbUrl) - .option("user", dbUser) - .option("password", dbPassword); + DataFrameReader readOptions = spark + .read() + .format("jdbc") + .option("url", dbUrl) + .option("user", dbUser) + .option("password", dbPassword); - new SparkCollectSimRels( - parser, - spark, - readOptions.option("dbtable", "similarity_groups").load(), - readOptions.option("dbtable", "groups").load() - ).run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); - } + new SparkCollectSimRels( + parser, + spark, + readOptions.option("dbtable", "similarity_groups").load(), + readOptions.option("dbtable", "groups").load()) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } - @Override - void run(ISLookUpService isLookUpService) throws DocumentException, ISLookUpException, IOException { + @Override + void run(ISLookUpService isLookUpService) throws DocumentException, ISLookUpException, IOException { - // read oozie parameters - final String isLookUpUrl = parser.get("isLookUpUrl"); - final String actionSetId = parser.get("actionSetId"); - final String workingPath = parser.get("workingPath"); - final int numPartitions = Optional - .ofNullable(parser.get("numPartitions")) - .map(Integer::valueOf) - .orElse(NUM_PARTITIONS); - final String dbUrl = parser.get("postgresUrl"); - final String dbUser = parser.get("postgresUser"); + // read oozie parameters + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(NUM_PARTITIONS); + final String dbUrl = parser.get("postgresUrl"); + final String dbUser = parser.get("postgresUser"); - log.info("numPartitions: '{}'", numPartitions); - log.info("isLookUpUrl: '{}'", isLookUpUrl); - log.info("actionSetId: '{}'", actionSetId); - log.info("workingPath: '{}'", workingPath); - log.info("postgresUser: {}", dbUser); - log.info("postgresUrl: {}", dbUrl); - log.info("postgresPassword: xxx"); + log.info("numPartitions: '{}'", numPartitions); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); + log.info("postgresUser: {}", dbUser); + log.info("postgresUrl: {}", dbUrl); + log.info("postgresPassword: xxx"); - JavaPairRDD> similarityGroup = - simGroupsDS - .toJavaRDD() - .mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1))) - .groupByKey() - .mapToPair(i -> new Tuple2<>(i._1(), StreamSupport.stream(i._2().spliterator(), false) - .collect(Collectors.toList()))); + JavaPairRDD> similarityGroup = simGroupsDS + .toJavaRDD() + .mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1))) + .groupByKey() + .mapToPair( + i -> new Tuple2<>(i._1(), StreamSupport + .stream(i._2().spliterator(), false) + .collect(Collectors.toList()))); - JavaPairRDD groupIds = - groupsDS - .toJavaRDD() - .mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1))); + JavaPairRDD groupIds = groupsDS + .toJavaRDD() + .mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1))); - JavaRDD, List>> groups = similarityGroup - .leftOuterJoin(groupIds) - .filter(g -> g._2()._2().isPresent()) - .map(g -> new Tuple2<>(new Tuple2<>(g._1(), g._2()._2().get()), g._2()._1())); + JavaRDD, List>> groups = similarityGroup + .leftOuterJoin(groupIds) + .filter(g -> g._2()._2().isPresent()) + .map(g -> new Tuple2<>(new Tuple2<>(g._1(), g._2()._2().get()), g._2()._1())); - JavaRDD relations = groups.flatMap(g -> { - String firstId = g._2().get(0); - List rels = new ArrayList<>(); + JavaRDD relations = groups.flatMap(g -> { + String firstId = g._2().get(0); + List rels = new ArrayList<>(); - for (String id : g._2()) { - if (!firstId.equals(id)) - rels.add(createSimRel(firstId, id, g._1()._2())); - } + for (String id : g._2()) { + if (!firstId.equals(id)) + rels.add(createSimRel(firstId, id, g._1()._2())); + } - return rels.iterator(); - }); + return rels.iterator(); + }); - Dataset resultRelations = spark.createDataset( - relations.filter(r -> r.getRelType().equals("resultResult")).rdd(), - Encoders.bean(Relation.class) - ).repartition(numPartitions); + Dataset resultRelations = spark + .createDataset( + relations.filter(r -> r.getRelType().equals("resultResult")).rdd(), + Encoders.bean(Relation.class)) + .repartition(numPartitions); - Dataset organizationRelations = spark.createDataset( - relations.filter(r -> r.getRelType().equals("organizationOrganization")).rdd(), - Encoders.bean(Relation.class) - ).repartition(numPartitions); + Dataset organizationRelations = spark + .createDataset( + relations.filter(r -> r.getRelType().equals("organizationOrganization")).rdd(), + Encoders.bean(Relation.class)) + .repartition(numPartitions); - for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { - switch(dedupConf.getWf().getSubEntityValue()){ - case "organization": - savePostgresRelation(organizationRelations, workingPath, actionSetId, "organization"); - break; - default: - savePostgresRelation(resultRelations, workingPath, actionSetId, dedupConf.getWf().getSubEntityValue()); - break; - } - } + for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { + switch (dedupConf.getWf().getSubEntityValue()) { + case "organization": + savePostgresRelation(organizationRelations, workingPath, actionSetId, "organization"); + break; + default: + savePostgresRelation( + resultRelations, workingPath, actionSetId, dedupConf.getWf().getSubEntityValue()); + break; + } + } - } + } - private Relation createSimRel(String source, String target, String entity) { - final Relation r = new Relation(); - r.setSubRelType("dedupSimilarity"); - r.setRelClass("isSimilarTo"); - r.setDataInfo(new DataInfo()); + private Relation createSimRel(String source, String target, String entity) { + final Relation r = new Relation(); + r.setSubRelType("dedupSimilarity"); + r.setRelClass("isSimilarTo"); + r.setDataInfo(new DataInfo()); - switch (entity) { - case "result": - r.setSource("50|" + source); - r.setTarget("50|" + target); - r.setRelType("resultResult"); - break; - case "organization": - r.setSource("20|" + source); - r.setTarget("20|" + target); - r.setRelType("organizationOrganization"); - break; - default: - throw new IllegalArgumentException("unmanaged entity type: " + entity); - } - return r; - } + switch (entity) { + case "result": + r.setSource("50|" + source); + r.setTarget("50|" + target); + r.setRelType("resultResult"); + break; + case "organization": + r.setSource("20|" + source); + r.setTarget("20|" + target); + r.setRelType("organizationOrganization"); + break; + default: + throw new IllegalArgumentException("unmanaged entity type: " + entity); + } + return r; + } - private void savePostgresRelation(Dataset newRelations, String workingPath, String actionSetId, String entityType) { - newRelations - .write() - .mode(SaveMode.Append) - .parquet(DedupUtility.createSimRelPath(workingPath, actionSetId, entityType)); - } + private void savePostgresRelation(Dataset newRelations, String workingPath, String actionSetId, + String entityType) { + newRelations + .write() + .mode(SaveMode.Append) + .parquet(DedupUtility.createSimRelPath(workingPath, actionSetId, entityType)); + } -} \ No newline at end of file +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index ce6226ddee..1122b42ebd 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -104,13 +104,13 @@ public class SparkCreateMergeRels extends AbstractSparkAction { .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) .mapToPair((PairFunction) s -> new Tuple2<>(hash(s), s)); - final RDD> edgeRdd = spark - .read() - .load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)) - .as(Encoders.bean(Relation.class)) - .javaRDD() - .map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass())) - .rdd(); + final RDD> edgeRdd = spark + .read() + .load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)) + .as(Encoders.bean(Relation.class)) + .javaRDD() + .map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass())) + .rdd(); final Dataset mergeRels = spark .createDataset( diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index babccefb4c..d5033d4253 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -100,17 +100,17 @@ public class SparkCreateSimRels extends AbstractSparkAction { .repartition(numPartitions); // create relations by comparing only elements in the same group - spark.createDataset( - Deduper - .computeRelations(sc, blocks, dedupConf) - .map(t -> createSimRel(t._1(), t._2(), entity)) - .repartition(numPartitions) - .rdd(), - Encoders.bean(Relation.class) - ) - .write() - .mode(SaveMode.Append) - .parquet(outputPath); + spark + .createDataset( + Deduper + .computeRelations(sc, blocks, dedupConf) + .map(t -> createSimRel(t._1(), t._2(), entity)) + .repartition(numPartitions) + .rdd(), + Encoders.bean(Relation.class)) + .write() + .mode(SaveMode.Append) + .parquet(outputPath); } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java index dff1bbb0de..d6c548de31 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java @@ -1,13 +1,11 @@ + package eu.dnetlib.dhp.oa.dedup; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.EntityType; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.config.DedupConfig; +import static jdk.nashorn.internal.objects.NativeDebug.map; + +import java.io.IOException; +import java.util.*; + import org.apache.commons.io.IOUtils; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; @@ -15,6 +13,7 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; @@ -24,145 +23,172 @@ import org.apache.spark.sql.SparkSession; import org.dom4j.DocumentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Tuple2; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.config.DedupConfig; +import scala.Tuple2; public class SparkPrepareOrgRels extends AbstractSparkAction { - private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class); + private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class); - public static final String ROOT_TRUST = "0.8"; - public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup"; - public static final String PROVENANCE_ACTIONS = "dnet:provenanceActions"; + public SparkPrepareOrgRels(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } - public SparkPrepareOrgRels(ArgumentApplicationParser parser, SparkSession spark) { - super(parser, spark); - } + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json"))); + parser.parseArgument(args); - public static void main(String[] args) throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json"))); - parser.parseArgument(args); + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + new SparkPrepareOrgRels(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } - new SparkCreateDedupRecord(parser, getSparkSession(conf)) - .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); - } + @Override + public void run(ISLookUpService isLookUpService) throws IOException { - @Override - public void run(ISLookUpService isLookUpService) throws IOException { + final String graphBasePath = parser.get("graphBasePath"); + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); + final int numConnections = Optional + .ofNullable(parser.get("numConnections")) + .map(Integer::valueOf) + .orElse(NUM_CONNECTIONS); - final String graphBasePath = parser.get("graphBasePath"); - final String isLookUpUrl = parser.get("isLookUpUrl"); - final String actionSetId = parser.get("actionSetId"); - final String workingPath = parser.get("workingPath"); - final String apiUrl = parser.get("apiUrl"); - final String dbUrl = parser.get("dbUrl"); - final String dbTable = parser.get("dbTable"); - final String dbUser = parser.get("dbUser"); - final String dbPwd = parser.get("dbPwd"); + final String apiUrl = Optional + .ofNullable(parser.get("apiUrl")) + .orElse(""); - log.info("graphBasePath: '{}'", graphBasePath); - log.info("isLookUpUrl: '{}'", isLookUpUrl); - log.info("actionSetId: '{}'", actionSetId); - log.info("workingPath: '{}'", workingPath); - log.info("apiUrl: '{}'", apiUrl); - log.info("dbUrl: '{}'", dbUrl); - log.info("dbUser: '{}'", dbUser); - log.info("table: '{}'", dbTable); - log.info("dbPwd: '{}'", "xxx"); + final String dbUrl = parser.get("dbUrl"); + final String dbTable = parser.get("dbTable"); + final String dbUser = parser.get("dbUser"); + final String dbPwd = parser.get("dbPwd"); - final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization"); - final String entityPath = DedupUtility.createEntityPath(graphBasePath, "organization"); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("isLookUpUrl: '{}'", isLookUpUrl); + log.info("actionSetId: '{}'", actionSetId); + log.info("workingPath: '{}'", workingPath); + log.info("numPartitions: '{}'", numConnections); + log.info("apiUrl: '{}'", apiUrl); + log.info("dbUrl: '{}'", dbUrl); + log.info("dbUser: '{}'", dbUser); + log.info("table: '{}'", dbTable); + log.info("dbPwd: '{}'", "xxx"); - Dataset relations = createRelations(spark, mergeRelPath, entityPath); + final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization"); + final String entityPath = DedupUtility.createEntityPath(graphBasePath, "organization"); - final Properties connectionProperties = new Properties(); - connectionProperties.put("user", dbUser); - connectionProperties.put("password", dbPwd); + Dataset relations = createRelations(spark, mergeRelPath, entityPath); - relations.write().mode(SaveMode.Overwrite).jdbc(dbUrl, dbTable, connectionProperties); + final Properties connectionProperties = new Properties(); + connectionProperties.put("user", dbUser); + connectionProperties.put("password", dbPwd); - if (!apiUrl.isEmpty()) - updateSimRels(apiUrl); + relations + .repartition(numConnections) + .write() + .mode(SaveMode.Overwrite) + .jdbc(dbUrl, dbTable, connectionProperties); - } + if (!apiUrl.isEmpty()) + updateSimRels(apiUrl); - public static Dataset createRelations( - final SparkSession spark, - final String mergeRelsPath, - final String entitiesPath) { + } - // - Dataset> entities = spark - .read() - .textFile(entitiesPath) - .map( - (MapFunction>) it -> { - Organization entity = OBJECT_MAPPER.readValue(it, Organization.class); - return new Tuple2<>(entity.getId(), entity); - }, - Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class))); + public static Dataset createRelations( + final SparkSession spark, + final String mergeRelsPath, + final String entitiesPath) { - Dataset> relations = spark.createDataset( - spark - .read() - .load(mergeRelsPath) - .as(Encoders.bean(Relation.class)) - .where("relClass == 'merges'") - .toJavaRDD() - .mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget())) - .groupByKey() - .flatMap(g -> { - List> rels = new ArrayList<>(); - for (String id1 : g._2()) { - for (String id2 : g._2()) { - if (!id1.equals(id2)) - if (id1.contains("openorgs")) - rels.add(new Tuple2<>(id1, id2)); - } - } - return rels.iterator(); - }).rdd(), - Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + // + Dataset> entities = spark + .read() + .textFile(entitiesPath) + .map( + (MapFunction>) it -> { + Organization entity = OBJECT_MAPPER.readValue(it, Organization.class); + return new Tuple2<>(entity.getId(), entity); + }, + Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class))); - return relations - .joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner") - .map( - (MapFunction, Tuple2>, OrgSimRel>)r -> - new OrgSimRel( - r._1()._2(), - r._2()._2().getOriginalId().get(0), - r._2()._2().getLegalname().getValue(), - r._2()._2().getLegalshortname().getValue(), - r._2()._2().getCountry().getClassid(), - r._2()._2().getWebsiteurl().getValue(), - r._2()._2().getCollectedfrom().get(0).getValue() - ), - Encoders.bean(OrgSimRel.class) - ); + Dataset> relations = spark + .createDataset( + spark + .read() + .load(mergeRelsPath) + .as(Encoders.bean(Relation.class)) + .where("relClass == 'merges'") + .toJavaRDD() + .mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget())) + .groupByKey() + .flatMap(g -> { + List> rels = new ArrayList<>(); + for (String id1 : g._2()) { + for (String id2 : g._2()) { + if (!id1.equals(id2)) + if (id1.contains("openorgs____") && !id2.contains("openorgsmesh")) + rels.add(new Tuple2<>(id1, id2)); + } + } + return rels.iterator(); + }) + .rdd(), + Encoders.tuple(Encoders.STRING(), Encoders.STRING())); - } + Dataset> relations2 = relations // + .joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner") + .map( + (MapFunction, Tuple2>, OrgSimRel>) r -> new OrgSimRel( + r._1()._1(), + r._2()._2().getOriginalId().get(0), + r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "", + r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "", + r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "", + r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "", + r._2()._2().getCollectedfrom().get(0).getValue()), + Encoders.bean(OrgSimRel.class)) + .map( + (MapFunction>) o -> new Tuple2<>(o.getLocal_id(), o), + Encoders.tuple(Encoders.STRING(), Encoders.bean(OrgSimRel.class))); - private static String updateSimRels(final String apiUrl) throws IOException { - final HttpGet req = new HttpGet(apiUrl); - try (final CloseableHttpClient client = HttpClients.createDefault()) { - try (final CloseableHttpResponse response = client.execute(req)) { - return IOUtils.toString(response.getEntity().getContent()); - } - } - } + return relations2 + .joinWith(entities, relations2.col("_1").equalTo(entities.col("_1")), "inner") + .map( + (MapFunction, Tuple2>, OrgSimRel>) r -> { + OrgSimRel orgSimRel = r._1()._2(); + orgSimRel.setLocal_id(r._2()._2().getOriginalId().get(0)); + return orgSimRel; + }, + Encoders.bean(OrgSimRel.class)); + + } + + private static String updateSimRels(final String apiUrl) throws IOException { + + log.info("Updating simrels on the portal"); + + final HttpGet req = new HttpGet(apiUrl); + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + return IOUtils.toString(response.getEntity().getContent()); + } + } + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml index 82ecbc46bb..ec9967d6a0 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + graphBasePath @@ -24,10 +24,6 @@ cutConnectedComponent max number of elements in a connected component - - apiUrl - the url for the APIs of the openorgs service - dbUrl the url of the database @@ -109,6 +105,16 @@ + + + + + + + -pb + ${graphBasePath}/relation + ${workingPath}/${actionSetId}/organization_simrel + @@ -136,16 +142,6 @@ --workingPath${workingPath} --numPartitions8000 - - - - - - - -pb - ${graphBasePath}/relation - ${workingPath}/organization_simrel - @@ -203,6 +199,7 @@ --dbTable${dbTable} --dbUser${dbUser} --dbPwd${dbPwd} + --numConnections20 diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json index bcca48a156..b70d1af281 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json @@ -23,11 +23,17 @@ "paramDescription": "the id of the actionset (orchestrator)", "paramRequired": true }, + { + "paramName": "nc", + "paramLongName": "numConnections", + "paramDescription": "number of connections to the postgres db (for the write operation)", + "paramRequired": false + }, { "paramName": "au", "paramLongName": "apiUrl", "paramDescription": "the url for the APIs of the openorgs service", - "paramRequired": true + "paramRequired": false }, { "paramName": "du", diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java index 4317515840..30b213ff22 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java @@ -138,10 +138,10 @@ public class EntityMergerTest implements Serializable { public void publicationMergerTest3() throws InstantiationException, IllegalAccessException { Publication pub_merged = DedupRecordFactory - .entityMerger(dedupId, publications3.iterator(), 0, dataInfo, Publication.class); + .entityMerger(dedupId, publications3.iterator(), 0, dataInfo, Publication.class); // verify id - assertEquals( "50|dedup_doi___::0ca46ff10b2b4c756191719d85302b14", pub_merged.getId()); + assertEquals("50|dedup_doi___::0ca46ff10b2b4c756191719d85302b14", pub_merged.getId()); } @@ -149,7 +149,7 @@ public class EntityMergerTest implements Serializable { public void publicationMergerTest4() throws InstantiationException, IllegalStateException, IllegalAccessException { Publication pub_merged = DedupRecordFactory - .entityMerger(dedupId, publications4.iterator(), 0, dataInfo, Publication.class); + .entityMerger(dedupId, publications4.iterator(), 0, dataInfo, Publication.class); // verify id assertEquals("50|dedup_wf_001::2d2bbbbcfb285e3fb3590237b79e2fa8", pub_merged.getId()); @@ -160,7 +160,7 @@ public class EntityMergerTest implements Serializable { public void publicationMergerTest5() throws InstantiationException, IllegalStateException, IllegalAccessException { Publication pub_merged = DedupRecordFactory - .entityMerger(dedupId, publications5.iterator(), 0, dataInfo, Publication.class); + .entityMerger(dedupId, publications5.iterator(), 0, dataInfo, Publication.class); // verify id assertEquals("50|dedup_wf_001::584b89679c3ccd1015b647ec63cc2699", pub_merged.getId()); diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 59c8505917..d9b5e30ebc 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -1,11 +1,18 @@ + package eu.dnetlib.dhp.oa.dedup; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.util.MapDocumentUtil; +import static java.nio.file.Files.createTempDirectory; + +import static org.apache.spark.sql.functions.count; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.lenient; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.nio.file.Paths; + import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -21,19 +28,16 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.util.MapDocumentUtil; import scala.Tuple2; -import java.io.File; -import java.io.IOException; -import java.io.Serializable; -import java.net.URISyntaxException; -import java.nio.file.Paths; - -import static java.nio.file.Files.createTempDirectory; -import static org.apache.spark.sql.functions.count; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.lenient; - @ExtendWith(MockitoExtension.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class SparkDedupTest implements Serializable { @@ -48,7 +52,7 @@ public class SparkDedupTest implements Serializable { private static String testOutputBasePath; private static String testDedupGraphBasePath; private static final String testActionSetId = "test-orchestrator"; - private static String testDedupAssertionsBasePath; + private static String testDedupAssertionsBasePath; @BeforeAll public static void cleanUp() throws IOException, URISyntaxException { @@ -64,9 +68,9 @@ public class SparkDedupTest implements Serializable { .toAbsolutePath() .toString(); testDedupAssertionsBasePath = Paths - .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/assertions").toURI()) - .toFile() - .getAbsolutePath(); + .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/assertions").toURI()) + .toFile() + .getAbsolutePath(); FileUtils.deleteDirectory(new File(testOutputBasePath)); FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); @@ -82,7 +86,7 @@ public class SparkDedupTest implements Serializable { jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - } + } @BeforeEach public void setUp() throws IOException, ISLookUpException { @@ -165,98 +169,98 @@ public class SparkDedupTest implements Serializable { new SparkCreateSimRels(parser, spark).run(isLookUpService); - long orgs_simrel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel") - .count(); + long orgs_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel") + .count(); - long pubs_simrel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel") - .count(); + long pubs_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel") + .count(); - long sw_simrel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/software_simrel") - .count(); + long sw_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/software_simrel") + .count(); - long ds_simrel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel") - .count(); + long ds_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel") + .count(); - long orp_simrel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel") - .count(); + long orp_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel") + .count(); - assertEquals(3432, orgs_simrel); - assertEquals(7152, pubs_simrel); + assertEquals(3082, orgs_simrel); + assertEquals(7036, pubs_simrel); assertEquals(344, sw_simrel); - assertEquals(458, ds_simrel); + assertEquals(442, ds_simrel); assertEquals(6750, orp_simrel); } - @Test - @Order(2) - public void collectSimRelsTest() throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkCreateSimRels.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json"))); - parser - .parseArgument( - new String[] { - "-asi", testActionSetId, - "-la", "lookupurl", - "-w", testOutputBasePath, - "-np", "50", - "-purl", "jdbc:postgresql://localhost:5432/dnet_dedup", - "-pusr", "postgres_url", - "-ppwd", "" - }); + @Test + @Order(2) + public void collectSimRelsTest() throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCollectSimRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json"))); + parser + .parseArgument( + new String[] { + "-asi", testActionSetId, + "-la", "lookupurl", + "-w", testOutputBasePath, + "-np", "50", + "-purl", "jdbc:postgresql://localhost:5432/dnet_dedup", + "-pusr", "postgres_user", + "-ppwd", "" + }); - new SparkCollectSimRels( - parser, - spark, - spark.read().load(testDedupAssertionsBasePath + "/similarity_groups"), - spark.read().load(testDedupAssertionsBasePath + "/groups") - ).run(null); + new SparkCollectSimRels( + parser, + spark, + spark.read().load(testDedupAssertionsBasePath + "/similarity_groups"), + spark.read().load(testDedupAssertionsBasePath + "/groups")) + .run(isLookUpService); - long orgs_simrel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel") - .count(); + long orgs_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel") + .count(); - long pubs_simrel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel") - .count(); + long pubs_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel") + .count(); - long sw_simrel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/software_simrel") - .count(); + long sw_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/software_simrel") + .count(); - long ds_simrel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel") - .count(); + long ds_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel") + .count(); - long orp_simrel = spark - .read() - .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel") - .count(); + long orp_simrel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel") + .count(); - assertEquals(4022, orgs_simrel); - assertEquals(10575, pubs_simrel); - assertEquals(3767, sw_simrel); - assertEquals(3881, ds_simrel); - assertEquals(10173, orp_simrel); + assertEquals(3672, orgs_simrel); + assertEquals(10459, pubs_simrel); + assertEquals(3767, sw_simrel); + assertEquals(3865, ds_simrel); + assertEquals(10173, orp_simrel); - } + } @Test @Order(3) @@ -402,8 +406,8 @@ public class SparkDedupTest implements Serializable { .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") .count(); - assertEquals(1276, orgs_mergerel); - assertEquals(1442, pubs_mergerel); + assertEquals(1272, orgs_mergerel); + assertEquals(1438, pubs_mergerel); assertEquals(288, sw_mergerel); assertEquals(472, ds_mergerel); assertEquals(718, orp_mergerel); @@ -449,10 +453,10 @@ public class SparkDedupTest implements Serializable { testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord") .count(); - assertEquals(82, orgs_deduprecord); - assertEquals(66, pubs_deduprecord); + assertEquals(84, orgs_deduprecord); + assertEquals(65, pubs_deduprecord); assertEquals(51, sw_deduprecord); - assertEquals(96, ds_deduprecord); + assertEquals(97, ds_deduprecord); assertEquals(89, orp_deduprecord); } @@ -532,12 +536,12 @@ public class SparkDedupTest implements Serializable { .distinct() .count(); - assertEquals(897, publications); - assertEquals(835, organizations); + assertEquals(896, publications); + assertEquals(837, organizations); assertEquals(100, projects); assertEquals(100, datasource); assertEquals(200, softwares); - assertEquals(388, dataset); + assertEquals(389, dataset); assertEquals(517, otherresearchproduct); long deletedOrgs = jsc @@ -592,7 +596,7 @@ public class SparkDedupTest implements Serializable { long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); - assertEquals(4866, relations); + assertEquals(4858, relations); // check deletedbyinference final Dataset mergeRels = spark @@ -641,11 +645,11 @@ public class SparkDedupTest implements Serializable { assertEquals(expected_unique, rel.distinct().count()); } -// @AfterAll -// public static void finalCleanUp() throws IOException { -// FileUtils.deleteDirectory(new File(testOutputBasePath)); -// FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); -// } + @AfterAll + public static void finalCleanUp() throws IOException { + FileUtils.deleteDirectory(new File(testOutputBasePath)); + FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + } public boolean isDeletedByInference(String s) { return s.contains("\"deletedbyinference\":true"); diff --git a/pom.xml b/pom.xml index cec3dd75a0..f7300260cd 100644 --- a/pom.xml +++ b/pom.xml @@ -315,7 +315,7 @@ eu.dnetlib dnet-pace-core - 4.0.4 + 4.0.5 eu.dnetlib