diff --git a/dhp-workflows/dhp-dedup-openaire/pom.xml b/dhp-workflows/dhp-dedup-openaire/pom.xml
index 03ddbcf4c8..ff11c66e05 100644
--- a/dhp-workflows/dhp-dedup-openaire/pom.xml
+++ b/dhp-workflows/dhp-dedup-openaire/pom.xml
@@ -90,6 +90,10 @@
com.fasterxml.jackson.core
jackson-core
+
+ org.apache.httpcomponents
+ httpclient
+
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java
index 74cecb7b6b..9a11277645 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java
@@ -29,6 +29,7 @@ import eu.dnetlib.pace.config.DedupConfig;
abstract class AbstractSparkAction implements Serializable {
protected static final int NUM_PARTITIONS = 1000;
+ protected static final int NUM_CONNECTIONS = 20;
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java
index 0fc393ea5f..50dda887b6 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java
@@ -1,12 +1,10 @@
package eu.dnetlib.dhp.oa.dedup;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import eu.dnetlib.dhp.schema.common.EntityType;
-import eu.dnetlib.dhp.schema.common.ModelSupport;
-import eu.dnetlib.dhp.schema.oaf.*;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
@@ -15,11 +13,15 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.*;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+
+import eu.dnetlib.dhp.schema.common.EntityType;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
+import eu.dnetlib.dhp.schema.oaf.*;
+import scala.Tuple2;
public class DedupRecordFactory {
@@ -80,14 +82,14 @@ public class DedupRecordFactory {
final Collection dates = Lists.newArrayList();
final List> authors = Lists.newArrayList();
- final List bestPids = Lists.newArrayList(); //best pids list
+ final List bestPids = Lists.newArrayList(); // best pids list
entities
.forEachRemaining(
t -> {
T duplicate = t._2();
- //prepare the list of pids to use for the id generation
+ // prepare the list of pids to use for the id generation
bestPids.addAll(IdGenerator.bestPidtoIdentifier(duplicate));
entity.mergeFrom(duplicate);
@@ -115,5 +117,4 @@ public class DedupRecordFactory {
return entity;
}
-
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java
index 2d203a1b19..2916e063d2 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/IdGenerator.java
@@ -1,90 +1,112 @@
-package eu.dnetlib.dhp.oa.dedup;
-import com.google.common.collect.Lists;
-import eu.dnetlib.dhp.schema.common.EntityType;
-import eu.dnetlib.dhp.schema.common.ModelSupport;
-import eu.dnetlib.dhp.schema.oaf.Field;
-import eu.dnetlib.dhp.schema.oaf.OafEntity;
-import eu.dnetlib.dhp.schema.oaf.Result;
-import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
-import org.apache.commons.lang.NullArgumentException;
-import org.apache.commons.lang.StringUtils;
+package eu.dnetlib.dhp.oa.dedup;
import java.io.Serializable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
+import org.apache.commons.lang.NullArgumentException;
+import org.apache.commons.lang.StringUtils;
+
+import com.google.common.collect.Lists;
+
+import eu.dnetlib.dhp.schema.common.EntityType;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
+import eu.dnetlib.dhp.schema.oaf.Field;
+import eu.dnetlib.dhp.schema.oaf.OafEntity;
+import eu.dnetlib.dhp.schema.oaf.Result;
+import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
+
public class IdGenerator implements Serializable {
- private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
- public static String CROSSREF_ID = "10|openaire____::081b82f96300b6a6e3d282bad31cb6e2";
- public static String DATACITE_ID = "10|openaire____::9e3be59865b2c1c335d32dae2fe7b254";
+ public static String CROSSREF_ID = "10|openaire____::081b82f96300b6a6e3d282bad31cb6e2";
+ public static String DATACITE_ID = "10|openaire____::9e3be59865b2c1c335d32dae2fe7b254";
- //pick the best pid from the list (consider date and pidtype)
- public static String generate(List pids, String defaultID) {
- if (pids == null || pids.size() == 0)
- return defaultID;
+ // pick the best pid from the list (consider date and pidtype)
+ public static String generate(List pids, String defaultID) {
+ if (pids == null || pids.size() == 0)
+ return defaultID;
- Optional bp = pids.stream()
- .max(Identifier::compareTo);
+ Optional bp = pids
+ .stream()
+ .max(Identifier::compareTo);
- if (bp.get().isUseOriginal() || bp.get().getPid().getValue() == null) {
- return bp.get().getOriginalID().split("\\|")[0] + "|dedup_wf_001::" + DedupUtility.md5(bp.get().getOriginalID());
- } else {
- return bp.get().getOriginalID().split("\\|")[0] + "|" + createPrefix(bp.get().getPid().getQualifier().getClassid()) + "::" + DedupUtility.md5(bp.get().getPid().getValue());
- }
+ if (bp.get().isUseOriginal() || bp.get().getPid().getValue() == null) {
+ return bp.get().getOriginalID().split("\\|")[0] + "|dedup_wf_001::"
+ + DedupUtility.md5(bp.get().getOriginalID());
+ } else {
+ return bp.get().getOriginalID().split("\\|")[0] + "|"
+ + createPrefix(bp.get().getPid().getQualifier().getClassid()) + "::"
+ + DedupUtility.md5(bp.get().getPid().getValue());
+ }
- }
+ }
- //pick the best pid from the entity. Returns a list (length 1) to save time in the call
- public static List bestPidtoIdentifier(T entity) {
+ // pick the best pid from the entity. Returns a list (length 1) to save time in the call
+ public static List bestPidtoIdentifier(T entity) {
- if (entity.getPid() == null || entity.getPid().size() == 0)
- return Lists.newArrayList(new Identifier(new StructuredProperty(), new Date(), PidType.original, entity.getCollectedfrom(), EntityType.fromClass(entity.getClass()), entity.getId()));
+ if (entity.getPid() == null || entity.getPid().size() == 0)
+ return Lists
+ .newArrayList(
+ new Identifier(new StructuredProperty(), new Date(), PidType.original, entity.getCollectedfrom(),
+ EntityType.fromClass(entity.getClass()), entity.getId()));
- Optional bp = entity.getPid().stream()
- .filter(pid -> PidType.classidValueOf(pid.getQualifier().getClassid()) != PidType.undefined)
- .max(Comparator.comparing(pid -> PidType.classidValueOf(pid.getQualifier().getClassid())));
+ Optional bp = entity
+ .getPid()
+ .stream()
+ .filter(pid -> PidType.classidValueOf(pid.getQualifier().getClassid()) != PidType.undefined)
+ .max(Comparator.comparing(pid -> PidType.classidValueOf(pid.getQualifier().getClassid())));
- return bp.map(structuredProperty ->
- Lists.newArrayList(new Identifier(structuredProperty, extractDate(entity, sdf), PidType.classidValueOf(structuredProperty.getQualifier().getClassid()), entity.getCollectedfrom(), EntityType.fromClass(entity.getClass()), entity.getId()))
- ).orElseGet(() -> Lists.newArrayList(new Identifier(new StructuredProperty(), new Date(), PidType.original, entity.getCollectedfrom(), EntityType.fromClass(entity.getClass()), entity.getId())));
+ return bp
+ .map(
+ structuredProperty -> Lists
+ .newArrayList(
+ new Identifier(structuredProperty, extractDate(entity, new SimpleDateFormat("yyyy-MM-dd")),
+ PidType.classidValueOf(structuredProperty.getQualifier().getClassid()),
+ entity.getCollectedfrom(), EntityType.fromClass(entity.getClass()), entity.getId())))
+ .orElseGet(
+ () -> Lists
+ .newArrayList(
+ new Identifier(new StructuredProperty(), new Date(), PidType.original,
+ entity.getCollectedfrom(), EntityType.fromClass(entity.getClass()), entity.getId())));
- }
+ }
- //create the prefix (length = 12): dedup_+ pidType
- public static String createPrefix(String pidType) {
+ // create the prefix (length = 12): dedup_+ pidType
+ public static String createPrefix(String pidType) {
- StringBuilder prefix = new StringBuilder("dedup_" + pidType);
+ StringBuilder prefix = new StringBuilder("dedup_" + pidType);
- while (prefix.length() < 12) {
- prefix.append("_");
- }
- return prefix.toString().substring(0, 12);
+ while (prefix.length() < 12) {
+ prefix.append("_");
+ }
+ return prefix.toString().substring(0, 12);
- }
+ }
- //extracts the date from the record. If the date is not available or is not wellformed, it returns a base date: 00-01-01
- public static Date extractDate(T duplicate, SimpleDateFormat sdf){
+ // extracts the date from the record. If the date is not available or is not wellformed, it returns a base date:
+ // 00-01-01
+ public static Date extractDate(T duplicate, SimpleDateFormat sdf) {
- String date = "2000-01-01";
- if (ModelSupport.isSubClass(duplicate, Result.class)) {
- Result result = (Result) duplicate;
- if (isWellformed(result.getDateofacceptance())){
- date = result.getDateofacceptance().getValue();
- }
- }
+ String date = "2000-01-01";
+ if (ModelSupport.isSubClass(duplicate, Result.class)) {
+ Result result = (Result) duplicate;
+ if (isWellformed(result.getDateofacceptance())) {
+ date = result.getDateofacceptance().getValue();
+ }
+ }
- try {
- return sdf.parse(date);
- } catch (ParseException e) {
- return new Date();
- }
+ try {
+ return sdf.parse(date);
+ } catch (ParseException e) {
+ return new Date();
+ }
- }
+ }
- public static boolean isWellformed(Field date) {
- return date != null && StringUtils.isNotBlank(date.getValue()) && date.getValue().matches(DatePicker.DATE_PATTERN) && DatePicker.inRange(date.getValue());
- }
+ public static boolean isWellformed(Field date) {
+ return date != null && StringUtils.isNotBlank(date.getValue())
+ && date.getValue().matches(DatePicker.DATE_PATTERN) && DatePicker.inRange(date.getValue());
+ }
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Identifier.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Identifier.java
index fd52d20f97..480b523412 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Identifier.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Identifier.java
@@ -1,132 +1,138 @@
-package eu.dnetlib.dhp.oa.dedup;
-import eu.dnetlib.dhp.schema.common.EntityType;
-import eu.dnetlib.dhp.schema.oaf.KeyValue;
-import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
+package eu.dnetlib.dhp.oa.dedup;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
-public class Identifier implements Serializable, Comparable{
+import eu.dnetlib.dhp.schema.common.EntityType;
+import eu.dnetlib.dhp.schema.oaf.KeyValue;
+import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
- StructuredProperty pid;
- Date date;
- PidType type;
- List collectedFrom;
- EntityType entityType;
- String originalID;
+public class Identifier implements Serializable, Comparable {
- boolean useOriginal = false; //to know if the top identifier won because of the alphabetical order of the original ID
+ StructuredProperty pid;
+ Date date;
+ PidType type;
+ List collectedFrom;
+ EntityType entityType;
+ String originalID;
- public Identifier(StructuredProperty pid, Date date, PidType type, List collectedFrom, EntityType entityType, String originalID) {
- this.pid = pid;
- this.date = date;
- this.type = type;
- this.collectedFrom = collectedFrom;
- this.entityType = entityType;
- this.originalID = originalID;
- }
+ boolean useOriginal = false; // to know if the top identifier won because of the alphabetical order of the original
+ // ID
- public StructuredProperty getPid() {
- return pid;
- }
+ public Identifier(StructuredProperty pid, Date date, PidType type, List collectedFrom,
+ EntityType entityType, String originalID) {
+ this.pid = pid;
+ this.date = date;
+ this.type = type;
+ this.collectedFrom = collectedFrom;
+ this.entityType = entityType;
+ this.originalID = originalID;
+ }
- public void setPid(StructuredProperty pidValue) {
- this.pid = pid;
- }
+ public StructuredProperty getPid() {
+ return pid;
+ }
- public Date getDate() {
- return date;
- }
+ public void setPid(StructuredProperty pidValue) {
+ this.pid = pid;
+ }
- public void setDate(Date date) {
- this.date = date;
- }
+ public Date getDate() {
+ return date;
+ }
- public PidType getType() {
- return type;
- }
+ public void setDate(Date date) {
+ this.date = date;
+ }
- public void setType(PidType type) {
- this.type = type;
- }
+ public PidType getType() {
+ return type;
+ }
- public List getCollectedFrom() {
- return collectedFrom;
- }
+ public void setType(PidType type) {
+ this.type = type;
+ }
- public void setCollectedFrom(List collectedFrom) {
- this.collectedFrom = collectedFrom;
- }
+ public List getCollectedFrom() {
+ return collectedFrom;
+ }
- public EntityType getEntityType() {
- return entityType;
- }
+ public void setCollectedFrom(List collectedFrom) {
+ this.collectedFrom = collectedFrom;
+ }
- public void setEntityType(EntityType entityType) {
- this.entityType = entityType;
- }
+ public EntityType getEntityType() {
+ return entityType;
+ }
- public String getOriginalID() {
- return originalID;
- }
+ public void setEntityType(EntityType entityType) {
+ this.entityType = entityType;
+ }
- public void setOriginalID(String originalID) {
- this.originalID = originalID;
- }
+ public String getOriginalID() {
+ return originalID;
+ }
- public boolean isUseOriginal() {
- return useOriginal;
- }
+ public void setOriginalID(String originalID) {
+ this.originalID = originalID;
+ }
- public void setUseOriginal(boolean useOriginal) {
- this.useOriginal = useOriginal;
- }
+ public boolean isUseOriginal() {
+ return useOriginal;
+ }
- @Override
- public int compareTo(Identifier i) {
- //priority in comparisons: 1) pidtype, 2) collectedfrom (depending on the entity type) , 3) date 4) alphabetical order of the originalID
- if (this.getType().compareTo(i.getType()) == 0){ //same type
- if (entityType == EntityType.publication) {
- if (isFromDatasourceID(this.collectedFrom, IdGenerator.CROSSREF_ID) && !isFromDatasourceID(i.collectedFrom, IdGenerator.CROSSREF_ID))
- return 1;
- if (isFromDatasourceID(i.collectedFrom, IdGenerator.CROSSREF_ID) && !isFromDatasourceID(this.collectedFrom, IdGenerator.CROSSREF_ID))
- return -1;
- }
- if (entityType == EntityType.dataset) {
- if (isFromDatasourceID(this.collectedFrom, IdGenerator.DATACITE_ID) && !isFromDatasourceID(i.collectedFrom, IdGenerator.DATACITE_ID))
- return 1;
- if (isFromDatasourceID(i.collectedFrom, IdGenerator.DATACITE_ID) && !isFromDatasourceID(this.collectedFrom, IdGenerator.DATACITE_ID))
- return -1;
- }
+ public void setUseOriginal(boolean useOriginal) {
+ this.useOriginal = useOriginal;
+ }
- if (this.getDate().compareTo(date) == 0) {//same date
+ @Override
+ public int compareTo(Identifier i) {
+ // priority in comparisons: 1) pidtype, 2) collectedfrom (depending on the entity type) , 3) date 4)
+ // alphabetical order of the originalID
+ if (this.getType().compareTo(i.getType()) == 0) { // same type
+ if (entityType == EntityType.publication) {
+ if (isFromDatasourceID(this.collectedFrom, IdGenerator.CROSSREF_ID)
+ && !isFromDatasourceID(i.collectedFrom, IdGenerator.CROSSREF_ID))
+ return 1;
+ if (isFromDatasourceID(i.collectedFrom, IdGenerator.CROSSREF_ID)
+ && !isFromDatasourceID(this.collectedFrom, IdGenerator.CROSSREF_ID))
+ return -1;
+ }
+ if (entityType == EntityType.dataset) {
+ if (isFromDatasourceID(this.collectedFrom, IdGenerator.DATACITE_ID)
+ && !isFromDatasourceID(i.collectedFrom, IdGenerator.DATACITE_ID))
+ return 1;
+ if (isFromDatasourceID(i.collectedFrom, IdGenerator.DATACITE_ID)
+ && !isFromDatasourceID(this.collectedFrom, IdGenerator.DATACITE_ID))
+ return -1;
+ }
- if (this.originalID.compareTo(i.originalID) > 0)
- this.useOriginal = true;
- else
- i.setUseOriginal(true);
+ if (this.getDate().compareTo(date) == 0) {// same date
- //the minus because we need to take the alphabetically lower id
- return -this.originalID.compareTo(i.originalID);
- }
- else
- //the minus is because we need to take the elder date
- return -this.getDate().compareTo(date);
- }
- else {
- return this.getType().compareTo(i.getType());
- }
+ if (this.originalID.compareTo(i.originalID) > 0)
+ this.useOriginal = true;
+ else
+ i.setUseOriginal(true);
- }
+ // the minus because we need to take the alphabetically lower id
+ return -this.originalID.compareTo(i.originalID);
+ } else
+ // the minus is because we need to take the elder date
+ return -this.getDate().compareTo(date);
+ } else {
+ return this.getType().compareTo(i.getType());
+ }
- public boolean isFromDatasourceID(List collectedFrom, String dsId){
+ }
- for(KeyValue cf: collectedFrom) {
- if(cf.getKey().equals(dsId))
- return true;
- }
- return false;
- }
+ public boolean isFromDatasourceID(List collectedFrom, String dsId) {
+
+ for (KeyValue cf : collectedFrom) {
+ if (cf.getKey().equals(dsId))
+ return true;
+ }
+ return false;
+ }
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OrgSimRel.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OrgSimRel.java
index a7d8ead0b3..84dfecd624 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OrgSimRel.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OrgSimRel.java
@@ -1,83 +1,98 @@
+
package eu.dnetlib.dhp.oa.dedup;
import java.io.Serializable;
public class OrgSimRel implements Serializable {
- String local_id;
- String oa_original_id;
- String oa_name;
- String oa_acronym;
- String oa_country;
- String oa_url;
- String oa_collectedfrom;
+ String local_id;
+ String oa_original_id;
+ String oa_name;
+ String oa_acronym;
+ String oa_country;
+ String oa_url;
+ String oa_collectedfrom;
- public OrgSimRel() {
- }
+ public OrgSimRel() {
+ }
- public OrgSimRel(String local_id, String oa_original_id, String oa_name, String oa_acronym, String oa_country, String oa_url, String oa_collectedfrom) {
- this.local_id = local_id;
- this.oa_original_id = oa_original_id;
- this.oa_name = oa_name;
- this.oa_acronym = oa_acronym;
- this.oa_country = oa_country;
- this.oa_url = oa_url;
- this.oa_collectedfrom = oa_collectedfrom;
- }
+ public OrgSimRel(String local_id, String oa_original_id, String oa_name, String oa_acronym, String oa_country,
+ String oa_url, String oa_collectedfrom) {
+ this.local_id = local_id;
+ this.oa_original_id = oa_original_id;
+ this.oa_name = oa_name;
+ this.oa_acronym = oa_acronym;
+ this.oa_country = oa_country;
+ this.oa_url = oa_url;
+ this.oa_collectedfrom = oa_collectedfrom;
+ }
- public String getLocal_id() {
- return local_id;
- }
+ public String getLocal_id() {
+ return local_id;
+ }
- public void setLocal_id(String local_id) {
- this.local_id = local_id;
- }
+ public void setLocal_id(String local_id) {
+ this.local_id = local_id;
+ }
- public String getOa_original_id() {
- return oa_original_id;
- }
+ public String getOa_original_id() {
+ return oa_original_id;
+ }
- public void setOa_original_id(String oa_original_id) {
- this.oa_original_id = oa_original_id;
- }
+ public void setOa_original_id(String oa_original_id) {
+ this.oa_original_id = oa_original_id;
+ }
- public String getOa_name() {
- return oa_name;
- }
+ public String getOa_name() {
+ return oa_name;
+ }
- public void setOa_name(String oa_name) {
- this.oa_name = oa_name;
- }
+ public void setOa_name(String oa_name) {
+ this.oa_name = oa_name;
+ }
- public String getOa_acronym() {
- return oa_acronym;
- }
+ public String getOa_acronym() {
+ return oa_acronym;
+ }
- public void setOa_acronym(String oa_acronym) {
- this.oa_acronym = oa_acronym;
- }
+ public void setOa_acronym(String oa_acronym) {
+ this.oa_acronym = oa_acronym;
+ }
- public String getOa_country() {
- return oa_country;
- }
+ public String getOa_country() {
+ return oa_country;
+ }
- public void setOa_country(String oa_country) {
- this.oa_country = oa_country;
- }
+ public void setOa_country(String oa_country) {
+ this.oa_country = oa_country;
+ }
- public String getOa_url() {
- return oa_url;
- }
+ public String getOa_url() {
+ return oa_url;
+ }
- public void setOa_url(String oa_url) {
- this.oa_url = oa_url;
- }
+ public void setOa_url(String oa_url) {
+ this.oa_url = oa_url;
+ }
- public String getOa_collectedfrom() {
- return oa_collectedfrom;
- }
+ public String getOa_collectedfrom() {
+ return oa_collectedfrom;
+ }
- public void setOa_collectedfrom(String oa_collectedfrom) {
- this.oa_collectedfrom = oa_collectedfrom;
- }
+ public void setOa_collectedfrom(String oa_collectedfrom) {
+ this.oa_collectedfrom = oa_collectedfrom;
+ }
+
+ @Override
+ public String toString() {
+ return "OrgSimRel{" +
+ "local_id='" + local_id + '\'' +
+ ", oa_original_id='" + oa_original_id + '\'' +
+ ", oa_name='" + oa_name + '\'' +
+ ", oa_acronym='" + oa_acronym + '\'' +
+ ", oa_country='" + oa_country + '\'' +
+ ", oa_url='" + oa_url + '\'' +
+ ", oa_collectedfrom='" + oa_collectedfrom + '\'' +
+ '}';
+ }
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/PidType.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/PidType.java
index ab5c498680..c3241bac64 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/PidType.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/PidType.java
@@ -1,25 +1,17 @@
+
package eu.dnetlib.dhp.oa.dedup;
public enum PidType {
- //from the less to the more important
- undefined,
- original,
- orcid,
- ror,
- grid,
- pdb,
- arXiv,
- pmid,
- doi;
+ // from the less to the more important
+ undefined, original, orcid, ror, grid, pdb, arXiv, pmid, doi;
- public static PidType classidValueOf(String s){
- try {
- return PidType.valueOf(s);
- }
- catch (Exception e) {
- return PidType.undefined;
- }
- }
+ public static PidType classidValueOf(String s) {
+ try {
+ return PidType.valueOf(s);
+ } catch (Exception e) {
+ return PidType.undefined;
+ }
+ }
-}
\ No newline at end of file
+}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java
index b35f9c54ad..f9e6448b03 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCollectSimRels.java
@@ -1,21 +1,5 @@
-package eu.dnetlib.dhp.oa.dedup;
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.schema.oaf.DataInfo;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.utils.ISLookupClientFactory;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
-import eu.dnetlib.pace.config.DedupConfig;
-import org.apache.commons.io.IOUtils;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.sql.*;
-import org.dom4j.DocumentException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
+package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import java.util.ArrayList;
@@ -24,153 +8,177 @@ import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.*;
+import org.dom4j.DocumentException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.schema.oaf.DataInfo;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+import eu.dnetlib.dhp.utils.ISLookupClientFactory;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import eu.dnetlib.pace.config.DedupConfig;
+import scala.Tuple2;
+
public class SparkCollectSimRels extends AbstractSparkAction {
- private static final Logger log = LoggerFactory.getLogger(SparkCollectSimRels.class);
+ private static final Logger log = LoggerFactory.getLogger(SparkCollectSimRels.class);
- Dataset simGroupsDS;
- Dataset groupsDS;
+ Dataset simGroupsDS;
+ Dataset groupsDS;
- public SparkCollectSimRels(ArgumentApplicationParser parser, SparkSession spark, Dataset simGroupsDS, Dataset groupsDS) {
- super(parser, spark);
- this.simGroupsDS = simGroupsDS;
- this.groupsDS = groupsDS;
- }
+ public SparkCollectSimRels(ArgumentApplicationParser parser, SparkSession spark, Dataset simGroupsDS,
+ Dataset groupsDS) {
+ super(parser, spark);
+ this.simGroupsDS = simGroupsDS;
+ this.groupsDS = groupsDS;
+ }
- public static void main(String[] args) throws Exception {
- ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils
- .toString(
- SparkBlockStats.class
- .getResourceAsStream(
- "/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json")));
- parser.parseArgument(args);
+ public static void main(String[] args) throws Exception {
+ ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ SparkBlockStats.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json")));
+ parser.parseArgument(args);
- SparkConf conf = new SparkConf();
+ SparkConf conf = new SparkConf();
- final String dbUrl = parser.get("postgresUrl");
- final String dbUser = parser.get("postgresUser");
- final String dbPassword = parser.get("postgresPassword");
+ final String dbUrl = parser.get("postgresUrl");
+ final String dbUser = parser.get("postgresUser");
+ final String dbPassword = parser.get("postgresPassword");
- SparkSession spark = getSparkSession(conf);
+ SparkSession spark = getSparkSession(conf);
- DataFrameReader readOptions = spark.read()
- .format("jdbc")
- .option("url", dbUrl)
- .option("user", dbUser)
- .option("password", dbPassword);
+ DataFrameReader readOptions = spark
+ .read()
+ .format("jdbc")
+ .option("url", dbUrl)
+ .option("user", dbUser)
+ .option("password", dbPassword);
- new SparkCollectSimRels(
- parser,
- spark,
- readOptions.option("dbtable", "similarity_groups").load(),
- readOptions.option("dbtable", "groups").load()
- ).run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
- }
+ new SparkCollectSimRels(
+ parser,
+ spark,
+ readOptions.option("dbtable", "similarity_groups").load(),
+ readOptions.option("dbtable", "groups").load())
+ .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
+ }
- @Override
- void run(ISLookUpService isLookUpService) throws DocumentException, ISLookUpException, IOException {
+ @Override
+ void run(ISLookUpService isLookUpService) throws DocumentException, ISLookUpException, IOException {
- // read oozie parameters
- final String isLookUpUrl = parser.get("isLookUpUrl");
- final String actionSetId = parser.get("actionSetId");
- final String workingPath = parser.get("workingPath");
- final int numPartitions = Optional
- .ofNullable(parser.get("numPartitions"))
- .map(Integer::valueOf)
- .orElse(NUM_PARTITIONS);
- final String dbUrl = parser.get("postgresUrl");
- final String dbUser = parser.get("postgresUser");
+ // read oozie parameters
+ final String isLookUpUrl = parser.get("isLookUpUrl");
+ final String actionSetId = parser.get("actionSetId");
+ final String workingPath = parser.get("workingPath");
+ final int numPartitions = Optional
+ .ofNullable(parser.get("numPartitions"))
+ .map(Integer::valueOf)
+ .orElse(NUM_PARTITIONS);
+ final String dbUrl = parser.get("postgresUrl");
+ final String dbUser = parser.get("postgresUser");
- log.info("numPartitions: '{}'", numPartitions);
- log.info("isLookUpUrl: '{}'", isLookUpUrl);
- log.info("actionSetId: '{}'", actionSetId);
- log.info("workingPath: '{}'", workingPath);
- log.info("postgresUser: {}", dbUser);
- log.info("postgresUrl: {}", dbUrl);
- log.info("postgresPassword: xxx");
+ log.info("numPartitions: '{}'", numPartitions);
+ log.info("isLookUpUrl: '{}'", isLookUpUrl);
+ log.info("actionSetId: '{}'", actionSetId);
+ log.info("workingPath: '{}'", workingPath);
+ log.info("postgresUser: {}", dbUser);
+ log.info("postgresUrl: {}", dbUrl);
+ log.info("postgresPassword: xxx");
- JavaPairRDD> similarityGroup =
- simGroupsDS
- .toJavaRDD()
- .mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1)))
- .groupByKey()
- .mapToPair(i -> new Tuple2<>(i._1(), StreamSupport.stream(i._2().spliterator(), false)
- .collect(Collectors.toList())));
+ JavaPairRDD> similarityGroup = simGroupsDS
+ .toJavaRDD()
+ .mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1)))
+ .groupByKey()
+ .mapToPair(
+ i -> new Tuple2<>(i._1(), StreamSupport
+ .stream(i._2().spliterator(), false)
+ .collect(Collectors.toList())));
- JavaPairRDD groupIds =
- groupsDS
- .toJavaRDD()
- .mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1)));
+ JavaPairRDD groupIds = groupsDS
+ .toJavaRDD()
+ .mapToPair(r -> new Tuple2<>(r.getString(0), r.getString(1)));
- JavaRDD, List>> groups = similarityGroup
- .leftOuterJoin(groupIds)
- .filter(g -> g._2()._2().isPresent())
- .map(g -> new Tuple2<>(new Tuple2<>(g._1(), g._2()._2().get()), g._2()._1()));
+ JavaRDD, List>> groups = similarityGroup
+ .leftOuterJoin(groupIds)
+ .filter(g -> g._2()._2().isPresent())
+ .map(g -> new Tuple2<>(new Tuple2<>(g._1(), g._2()._2().get()), g._2()._1()));
- JavaRDD relations = groups.flatMap(g -> {
- String firstId = g._2().get(0);
- List rels = new ArrayList<>();
+ JavaRDD relations = groups.flatMap(g -> {
+ String firstId = g._2().get(0);
+ List rels = new ArrayList<>();
- for (String id : g._2()) {
- if (!firstId.equals(id))
- rels.add(createSimRel(firstId, id, g._1()._2()));
- }
+ for (String id : g._2()) {
+ if (!firstId.equals(id))
+ rels.add(createSimRel(firstId, id, g._1()._2()));
+ }
- return rels.iterator();
- });
+ return rels.iterator();
+ });
- Dataset resultRelations = spark.createDataset(
- relations.filter(r -> r.getRelType().equals("resultResult")).rdd(),
- Encoders.bean(Relation.class)
- ).repartition(numPartitions);
+ Dataset resultRelations = spark
+ .createDataset(
+ relations.filter(r -> r.getRelType().equals("resultResult")).rdd(),
+ Encoders.bean(Relation.class))
+ .repartition(numPartitions);
- Dataset organizationRelations = spark.createDataset(
- relations.filter(r -> r.getRelType().equals("organizationOrganization")).rdd(),
- Encoders.bean(Relation.class)
- ).repartition(numPartitions);
+ Dataset organizationRelations = spark
+ .createDataset(
+ relations.filter(r -> r.getRelType().equals("organizationOrganization")).rdd(),
+ Encoders.bean(Relation.class))
+ .repartition(numPartitions);
- for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
- switch(dedupConf.getWf().getSubEntityValue()){
- case "organization":
- savePostgresRelation(organizationRelations, workingPath, actionSetId, "organization");
- break;
- default:
- savePostgresRelation(resultRelations, workingPath, actionSetId, dedupConf.getWf().getSubEntityValue());
- break;
- }
- }
+ for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
+ switch (dedupConf.getWf().getSubEntityValue()) {
+ case "organization":
+ savePostgresRelation(organizationRelations, workingPath, actionSetId, "organization");
+ break;
+ default:
+ savePostgresRelation(
+ resultRelations, workingPath, actionSetId, dedupConf.getWf().getSubEntityValue());
+ break;
+ }
+ }
- }
+ }
- private Relation createSimRel(String source, String target, String entity) {
- final Relation r = new Relation();
- r.setSubRelType("dedupSimilarity");
- r.setRelClass("isSimilarTo");
- r.setDataInfo(new DataInfo());
+ private Relation createSimRel(String source, String target, String entity) {
+ final Relation r = new Relation();
+ r.setSubRelType("dedupSimilarity");
+ r.setRelClass("isSimilarTo");
+ r.setDataInfo(new DataInfo());
- switch (entity) {
- case "result":
- r.setSource("50|" + source);
- r.setTarget("50|" + target);
- r.setRelType("resultResult");
- break;
- case "organization":
- r.setSource("20|" + source);
- r.setTarget("20|" + target);
- r.setRelType("organizationOrganization");
- break;
- default:
- throw new IllegalArgumentException("unmanaged entity type: " + entity);
- }
- return r;
- }
+ switch (entity) {
+ case "result":
+ r.setSource("50|" + source);
+ r.setTarget("50|" + target);
+ r.setRelType("resultResult");
+ break;
+ case "organization":
+ r.setSource("20|" + source);
+ r.setTarget("20|" + target);
+ r.setRelType("organizationOrganization");
+ break;
+ default:
+ throw new IllegalArgumentException("unmanaged entity type: " + entity);
+ }
+ return r;
+ }
- private void savePostgresRelation(Dataset newRelations, String workingPath, String actionSetId, String entityType) {
- newRelations
- .write()
- .mode(SaveMode.Append)
- .parquet(DedupUtility.createSimRelPath(workingPath, actionSetId, entityType));
- }
+ private void savePostgresRelation(Dataset newRelations, String workingPath, String actionSetId,
+ String entityType) {
+ newRelations
+ .write()
+ .mode(SaveMode.Append)
+ .parquet(DedupUtility.createSimRelPath(workingPath, actionSetId, entityType));
+ }
-}
\ No newline at end of file
+}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java
index ce6226ddee..1122b42ebd 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java
@@ -104,13 +104,13 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
.map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s))
.mapToPair((PairFunction) s -> new Tuple2<>(hash(s), s));
- final RDD> edgeRdd = spark
- .read()
- .load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity))
- .as(Encoders.bean(Relation.class))
- .javaRDD()
- .map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass()))
- .rdd();
+ final RDD> edgeRdd = spark
+ .read()
+ .load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity))
+ .as(Encoders.bean(Relation.class))
+ .javaRDD()
+ .map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass()))
+ .rdd();
final Dataset mergeRels = spark
.createDataset(
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java
index babccefb4c..d5033d4253 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java
@@ -100,17 +100,17 @@ public class SparkCreateSimRels extends AbstractSparkAction {
.repartition(numPartitions);
// create relations by comparing only elements in the same group
- spark.createDataset(
- Deduper
- .computeRelations(sc, blocks, dedupConf)
- .map(t -> createSimRel(t._1(), t._2(), entity))
- .repartition(numPartitions)
- .rdd(),
- Encoders.bean(Relation.class)
- )
- .write()
- .mode(SaveMode.Append)
- .parquet(outputPath);
+ spark
+ .createDataset(
+ Deduper
+ .computeRelations(sc, blocks, dedupConf)
+ .map(t -> createSimRel(t._1(), t._2(), entity))
+ .repartition(numPartitions)
+ .rdd(),
+ Encoders.bean(Relation.class))
+ .write()
+ .mode(SaveMode.Append)
+ .parquet(outputPath);
}
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java
index dff1bbb0de..d6c548de31 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java
@@ -1,13 +1,11 @@
+
package eu.dnetlib.dhp.oa.dedup;
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.schema.common.EntityType;
-import eu.dnetlib.dhp.schema.common.ModelSupport;
-import eu.dnetlib.dhp.schema.oaf.*;
-import eu.dnetlib.dhp.utils.ISLookupClientFactory;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
-import eu.dnetlib.pace.config.DedupConfig;
+import static jdk.nashorn.internal.objects.NativeDebug.map;
+
+import java.io.IOException;
+import java.util.*;
+
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
@@ -15,6 +13,7 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
@@ -24,145 +23,172 @@ import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.schema.common.EntityType;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
+import eu.dnetlib.dhp.schema.oaf.*;
+import eu.dnetlib.dhp.utils.ISLookupClientFactory;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import eu.dnetlib.pace.config.DedupConfig;
+import scala.Tuple2;
public class SparkPrepareOrgRels extends AbstractSparkAction {
- private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
+ private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
- public static final String ROOT_TRUST = "0.8";
- public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup";
- public static final String PROVENANCE_ACTIONS = "dnet:provenanceActions";
+ public SparkPrepareOrgRels(ArgumentApplicationParser parser, SparkSession spark) {
+ super(parser, spark);
+ }
- public SparkPrepareOrgRels(ArgumentApplicationParser parser, SparkSession spark) {
- super(parser, spark);
- }
+ public static void main(String[] args) throws Exception {
+ ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ SparkCreateSimRels.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json")));
+ parser.parseArgument(args);
- public static void main(String[] args) throws Exception {
- ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils
- .toString(
- SparkCreateSimRels.class
- .getResourceAsStream(
- "/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json")));
- parser.parseArgument(args);
+ SparkConf conf = new SparkConf();
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.registerKryoClasses(ModelSupport.getOafModelClasses());
- SparkConf conf = new SparkConf();
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- conf.registerKryoClasses(ModelSupport.getOafModelClasses());
+ new SparkPrepareOrgRels(parser, getSparkSession(conf))
+ .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
+ }
- new SparkCreateDedupRecord(parser, getSparkSession(conf))
- .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
- }
+ @Override
+ public void run(ISLookUpService isLookUpService) throws IOException {
- @Override
- public void run(ISLookUpService isLookUpService) throws IOException {
+ final String graphBasePath = parser.get("graphBasePath");
+ final String isLookUpUrl = parser.get("isLookUpUrl");
+ final String actionSetId = parser.get("actionSetId");
+ final String workingPath = parser.get("workingPath");
+ final int numConnections = Optional
+ .ofNullable(parser.get("numConnections"))
+ .map(Integer::valueOf)
+ .orElse(NUM_CONNECTIONS);
- final String graphBasePath = parser.get("graphBasePath");
- final String isLookUpUrl = parser.get("isLookUpUrl");
- final String actionSetId = parser.get("actionSetId");
- final String workingPath = parser.get("workingPath");
- final String apiUrl = parser.get("apiUrl");
- final String dbUrl = parser.get("dbUrl");
- final String dbTable = parser.get("dbTable");
- final String dbUser = parser.get("dbUser");
- final String dbPwd = parser.get("dbPwd");
+ final String apiUrl = Optional
+ .ofNullable(parser.get("apiUrl"))
+ .orElse("");
- log.info("graphBasePath: '{}'", graphBasePath);
- log.info("isLookUpUrl: '{}'", isLookUpUrl);
- log.info("actionSetId: '{}'", actionSetId);
- log.info("workingPath: '{}'", workingPath);
- log.info("apiUrl: '{}'", apiUrl);
- log.info("dbUrl: '{}'", dbUrl);
- log.info("dbUser: '{}'", dbUser);
- log.info("table: '{}'", dbTable);
- log.info("dbPwd: '{}'", "xxx");
+ final String dbUrl = parser.get("dbUrl");
+ final String dbTable = parser.get("dbTable");
+ final String dbUser = parser.get("dbUser");
+ final String dbPwd = parser.get("dbPwd");
- final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization");
- final String entityPath = DedupUtility.createEntityPath(graphBasePath, "organization");
+ log.info("graphBasePath: '{}'", graphBasePath);
+ log.info("isLookUpUrl: '{}'", isLookUpUrl);
+ log.info("actionSetId: '{}'", actionSetId);
+ log.info("workingPath: '{}'", workingPath);
+ log.info("numPartitions: '{}'", numConnections);
+ log.info("apiUrl: '{}'", apiUrl);
+ log.info("dbUrl: '{}'", dbUrl);
+ log.info("dbUser: '{}'", dbUser);
+ log.info("table: '{}'", dbTable);
+ log.info("dbPwd: '{}'", "xxx");
- Dataset relations = createRelations(spark, mergeRelPath, entityPath);
+ final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization");
+ final String entityPath = DedupUtility.createEntityPath(graphBasePath, "organization");
- final Properties connectionProperties = new Properties();
- connectionProperties.put("user", dbUser);
- connectionProperties.put("password", dbPwd);
+ Dataset relations = createRelations(spark, mergeRelPath, entityPath);
- relations.write().mode(SaveMode.Overwrite).jdbc(dbUrl, dbTable, connectionProperties);
+ final Properties connectionProperties = new Properties();
+ connectionProperties.put("user", dbUser);
+ connectionProperties.put("password", dbPwd);
- if (!apiUrl.isEmpty())
- updateSimRels(apiUrl);
+ relations
+ .repartition(numConnections)
+ .write()
+ .mode(SaveMode.Overwrite)
+ .jdbc(dbUrl, dbTable, connectionProperties);
- }
+ if (!apiUrl.isEmpty())
+ updateSimRels(apiUrl);
- public static Dataset createRelations(
- final SparkSession spark,
- final String mergeRelsPath,
- final String entitiesPath) {
+ }
- //
- Dataset> entities = spark
- .read()
- .textFile(entitiesPath)
- .map(
- (MapFunction>) it -> {
- Organization entity = OBJECT_MAPPER.readValue(it, Organization.class);
- return new Tuple2<>(entity.getId(), entity);
- },
- Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class)));
+ public static Dataset createRelations(
+ final SparkSession spark,
+ final String mergeRelsPath,
+ final String entitiesPath) {
- Dataset> relations = spark.createDataset(
- spark
- .read()
- .load(mergeRelsPath)
- .as(Encoders.bean(Relation.class))
- .where("relClass == 'merges'")
- .toJavaRDD()
- .mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget()))
- .groupByKey()
- .flatMap(g -> {
- List> rels = new ArrayList<>();
- for (String id1 : g._2()) {
- for (String id2 : g._2()) {
- if (!id1.equals(id2))
- if (id1.contains("openorgs"))
- rels.add(new Tuple2<>(id1, id2));
- }
- }
- return rels.iterator();
- }).rdd(),
- Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
+ //
+ Dataset> entities = spark
+ .read()
+ .textFile(entitiesPath)
+ .map(
+ (MapFunction>) it -> {
+ Organization entity = OBJECT_MAPPER.readValue(it, Organization.class);
+ return new Tuple2<>(entity.getId(), entity);
+ },
+ Encoders.tuple(Encoders.STRING(), Encoders.kryo(Organization.class)));
- return relations
- .joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner")
- .map(
- (MapFunction, Tuple2>, OrgSimRel>)r ->
- new OrgSimRel(
- r._1()._2(),
- r._2()._2().getOriginalId().get(0),
- r._2()._2().getLegalname().getValue(),
- r._2()._2().getLegalshortname().getValue(),
- r._2()._2().getCountry().getClassid(),
- r._2()._2().getWebsiteurl().getValue(),
- r._2()._2().getCollectedfrom().get(0).getValue()
- ),
- Encoders.bean(OrgSimRel.class)
- );
+ Dataset> relations = spark
+ .createDataset(
+ spark
+ .read()
+ .load(mergeRelsPath)
+ .as(Encoders.bean(Relation.class))
+ .where("relClass == 'merges'")
+ .toJavaRDD()
+ .mapToPair(r -> new Tuple2<>(r.getSource(), r.getTarget()))
+ .groupByKey()
+ .flatMap(g -> {
+ List> rels = new ArrayList<>();
+ for (String id1 : g._2()) {
+ for (String id2 : g._2()) {
+ if (!id1.equals(id2))
+ if (id1.contains("openorgs____") && !id2.contains("openorgsmesh"))
+ rels.add(new Tuple2<>(id1, id2));
+ }
+ }
+ return rels.iterator();
+ })
+ .rdd(),
+ Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
- }
+ Dataset> relations2 = relations //
+ .joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner")
+ .map(
+ (MapFunction, Tuple2>, OrgSimRel>) r -> new OrgSimRel(
+ r._1()._1(),
+ r._2()._2().getOriginalId().get(0),
+ r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "",
+ r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "",
+ r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "",
+ r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "",
+ r._2()._2().getCollectedfrom().get(0).getValue()),
+ Encoders.bean(OrgSimRel.class))
+ .map(
+ (MapFunction>) o -> new Tuple2<>(o.getLocal_id(), o),
+ Encoders.tuple(Encoders.STRING(), Encoders.bean(OrgSimRel.class)));
- private static String updateSimRels(final String apiUrl) throws IOException {
- final HttpGet req = new HttpGet(apiUrl);
- try (final CloseableHttpClient client = HttpClients.createDefault()) {
- try (final CloseableHttpResponse response = client.execute(req)) {
- return IOUtils.toString(response.getEntity().getContent());
- }
- }
- }
+ return relations2
+ .joinWith(entities, relations2.col("_1").equalTo(entities.col("_1")), "inner")
+ .map(
+ (MapFunction, Tuple2>, OrgSimRel>) r -> {
+ OrgSimRel orgSimRel = r._1()._2();
+ orgSimRel.setLocal_id(r._2()._2().getOriginalId().get(0));
+ return orgSimRel;
+ },
+ Encoders.bean(OrgSimRel.class));
+
+ }
+
+ private static String updateSimRels(final String apiUrl) throws IOException {
+
+ log.info("Updating simrels on the portal");
+
+ final HttpGet req = new HttpGet(apiUrl);
+ try (final CloseableHttpClient client = HttpClients.createDefault()) {
+ try (final CloseableHttpResponse response = client.execute(req)) {
+ return IOUtils.toString(response.getEntity().getContent());
+ }
+ }
+ }
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml
index 82ecbc46bb..ec9967d6a0 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/orgsdedup/oozie_app/workflow.xml
@@ -1,4 +1,4 @@
-
+
graphBasePath
@@ -24,10 +24,6 @@
cutConnectedComponent
max number of elements in a connected component
-
- apiUrl
- the url for the APIs of the openorgs service
-
dbUrl
the url of the database
@@ -109,6 +105,16 @@
+
+
+
+
+
+
+ -pb
+ ${graphBasePath}/relation
+ ${workingPath}/${actionSetId}/organization_simrel
+
@@ -136,16 +142,6 @@
--workingPath${workingPath}
--numPartitions8000
-
-
-
-
-
-
- -pb
- ${graphBasePath}/relation
- ${workingPath}/organization_simrel
-
@@ -203,6 +199,7 @@
--dbTable${dbTable}
--dbUser${dbUser}
--dbPwd${dbPwd}
+ --numConnections20
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json
index bcca48a156..b70d1af281 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json
@@ -23,11 +23,17 @@
"paramDescription": "the id of the actionset (orchestrator)",
"paramRequired": true
},
+ {
+ "paramName": "nc",
+ "paramLongName": "numConnections",
+ "paramDescription": "number of connections to the postgres db (for the write operation)",
+ "paramRequired": false
+ },
{
"paramName": "au",
"paramLongName": "apiUrl",
"paramDescription": "the url for the APIs of the openorgs service",
- "paramRequired": true
+ "paramRequired": false
},
{
"paramName": "du",
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java
index 4317515840..30b213ff22 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java
@@ -138,10 +138,10 @@ public class EntityMergerTest implements Serializable {
public void publicationMergerTest3() throws InstantiationException, IllegalAccessException {
Publication pub_merged = DedupRecordFactory
- .entityMerger(dedupId, publications3.iterator(), 0, dataInfo, Publication.class);
+ .entityMerger(dedupId, publications3.iterator(), 0, dataInfo, Publication.class);
// verify id
- assertEquals( "50|dedup_doi___::0ca46ff10b2b4c756191719d85302b14", pub_merged.getId());
+ assertEquals("50|dedup_doi___::0ca46ff10b2b4c756191719d85302b14", pub_merged.getId());
}
@@ -149,7 +149,7 @@ public class EntityMergerTest implements Serializable {
public void publicationMergerTest4() throws InstantiationException, IllegalStateException, IllegalAccessException {
Publication pub_merged = DedupRecordFactory
- .entityMerger(dedupId, publications4.iterator(), 0, dataInfo, Publication.class);
+ .entityMerger(dedupId, publications4.iterator(), 0, dataInfo, Publication.class);
// verify id
assertEquals("50|dedup_wf_001::2d2bbbbcfb285e3fb3590237b79e2fa8", pub_merged.getId());
@@ -160,7 +160,7 @@ public class EntityMergerTest implements Serializable {
public void publicationMergerTest5() throws InstantiationException, IllegalStateException, IllegalAccessException {
Publication pub_merged = DedupRecordFactory
- .entityMerger(dedupId, publications5.iterator(), 0, dataInfo, Publication.class);
+ .entityMerger(dedupId, publications5.iterator(), 0, dataInfo, Publication.class);
// verify id
assertEquals("50|dedup_wf_001::584b89679c3ccd1015b647ec63cc2699", pub_merged.getId());
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java
index 59c8505917..d9b5e30ebc 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java
@@ -1,11 +1,18 @@
+
package eu.dnetlib.dhp.oa.dedup;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
-import eu.dnetlib.pace.util.MapDocumentUtil;
+import static java.nio.file.Files.createTempDirectory;
+
+import static org.apache.spark.sql.functions.count;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.lenient;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
@@ -21,19 +28,16 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URISyntaxException;
-import java.nio.file.Paths;
-
-import static java.nio.file.Files.createTempDirectory;
-import static org.apache.spark.sql.functions.count;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.lenient;
-
@ExtendWith(MockitoExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class SparkDedupTest implements Serializable {
@@ -48,7 +52,7 @@ public class SparkDedupTest implements Serializable {
private static String testOutputBasePath;
private static String testDedupGraphBasePath;
private static final String testActionSetId = "test-orchestrator";
- private static String testDedupAssertionsBasePath;
+ private static String testDedupAssertionsBasePath;
@BeforeAll
public static void cleanUp() throws IOException, URISyntaxException {
@@ -64,9 +68,9 @@ public class SparkDedupTest implements Serializable {
.toAbsolutePath()
.toString();
testDedupAssertionsBasePath = Paths
- .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/assertions").toURI())
- .toFile()
- .getAbsolutePath();
+ .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/assertions").toURI())
+ .toFile()
+ .getAbsolutePath();
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
@@ -82,7 +86,7 @@ public class SparkDedupTest implements Serializable {
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
- }
+ }
@BeforeEach
public void setUp() throws IOException, ISLookUpException {
@@ -165,98 +169,98 @@ public class SparkDedupTest implements Serializable {
new SparkCreateSimRels(parser, spark).run(isLookUpService);
- long orgs_simrel = spark
- .read()
- .load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
- .count();
+ long orgs_simrel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
+ .count();
- long pubs_simrel = spark
- .read()
- .load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
- .count();
+ long pubs_simrel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
+ .count();
- long sw_simrel = spark
- .read()
- .load(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
- .count();
+ long sw_simrel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
+ .count();
- long ds_simrel = spark
- .read()
- .load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
- .count();
+ long ds_simrel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
+ .count();
- long orp_simrel = spark
- .read()
- .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
- .count();
+ long orp_simrel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
+ .count();
- assertEquals(3432, orgs_simrel);
- assertEquals(7152, pubs_simrel);
+ assertEquals(3082, orgs_simrel);
+ assertEquals(7036, pubs_simrel);
assertEquals(344, sw_simrel);
- assertEquals(458, ds_simrel);
+ assertEquals(442, ds_simrel);
assertEquals(6750, orp_simrel);
}
- @Test
- @Order(2)
- public void collectSimRelsTest() throws Exception {
- ArgumentApplicationParser parser = new ArgumentApplicationParser(
- IOUtils
- .toString(
- SparkCreateSimRels.class
- .getResourceAsStream(
- "/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json")));
- parser
- .parseArgument(
- new String[] {
- "-asi", testActionSetId,
- "-la", "lookupurl",
- "-w", testOutputBasePath,
- "-np", "50",
- "-purl", "jdbc:postgresql://localhost:5432/dnet_dedup",
- "-pusr", "postgres_url",
- "-ppwd", ""
- });
+ @Test
+ @Order(2)
+ public void collectSimRelsTest() throws Exception {
+ ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ SparkCollectSimRels.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/dedup/collectSimRels_parameters.json")));
+ parser
+ .parseArgument(
+ new String[] {
+ "-asi", testActionSetId,
+ "-la", "lookupurl",
+ "-w", testOutputBasePath,
+ "-np", "50",
+ "-purl", "jdbc:postgresql://localhost:5432/dnet_dedup",
+ "-pusr", "postgres_user",
+ "-ppwd", ""
+ });
- new SparkCollectSimRels(
- parser,
- spark,
- spark.read().load(testDedupAssertionsBasePath + "/similarity_groups"),
- spark.read().load(testDedupAssertionsBasePath + "/groups")
- ).run(null);
+ new SparkCollectSimRels(
+ parser,
+ spark,
+ spark.read().load(testDedupAssertionsBasePath + "/similarity_groups"),
+ spark.read().load(testDedupAssertionsBasePath + "/groups"))
+ .run(isLookUpService);
- long orgs_simrel = spark
- .read()
- .load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
- .count();
+ long orgs_simrel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
+ .count();
- long pubs_simrel = spark
- .read()
- .load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
- .count();
+ long pubs_simrel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
+ .count();
- long sw_simrel = spark
- .read()
- .load(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
- .count();
+ long sw_simrel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
+ .count();
- long ds_simrel = spark
- .read()
- .load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
- .count();
+ long ds_simrel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
+ .count();
- long orp_simrel = spark
- .read()
- .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
- .count();
+ long orp_simrel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
+ .count();
- assertEquals(4022, orgs_simrel);
- assertEquals(10575, pubs_simrel);
- assertEquals(3767, sw_simrel);
- assertEquals(3881, ds_simrel);
- assertEquals(10173, orp_simrel);
+ assertEquals(3672, orgs_simrel);
+ assertEquals(10459, pubs_simrel);
+ assertEquals(3767, sw_simrel);
+ assertEquals(3865, ds_simrel);
+ assertEquals(10173, orp_simrel);
- }
+ }
@Test
@Order(3)
@@ -402,8 +406,8 @@ public class SparkDedupTest implements Serializable {
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")
.count();
- assertEquals(1276, orgs_mergerel);
- assertEquals(1442, pubs_mergerel);
+ assertEquals(1272, orgs_mergerel);
+ assertEquals(1438, pubs_mergerel);
assertEquals(288, sw_mergerel);
assertEquals(472, ds_mergerel);
assertEquals(718, orp_mergerel);
@@ -449,10 +453,10 @@ public class SparkDedupTest implements Serializable {
testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord")
.count();
- assertEquals(82, orgs_deduprecord);
- assertEquals(66, pubs_deduprecord);
+ assertEquals(84, orgs_deduprecord);
+ assertEquals(65, pubs_deduprecord);
assertEquals(51, sw_deduprecord);
- assertEquals(96, ds_deduprecord);
+ assertEquals(97, ds_deduprecord);
assertEquals(89, orp_deduprecord);
}
@@ -532,12 +536,12 @@ public class SparkDedupTest implements Serializable {
.distinct()
.count();
- assertEquals(897, publications);
- assertEquals(835, organizations);
+ assertEquals(896, publications);
+ assertEquals(837, organizations);
assertEquals(100, projects);
assertEquals(100, datasource);
assertEquals(200, softwares);
- assertEquals(388, dataset);
+ assertEquals(389, dataset);
assertEquals(517, otherresearchproduct);
long deletedOrgs = jsc
@@ -592,7 +596,7 @@ public class SparkDedupTest implements Serializable {
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
- assertEquals(4866, relations);
+ assertEquals(4858, relations);
// check deletedbyinference
final Dataset mergeRels = spark
@@ -641,11 +645,11 @@ public class SparkDedupTest implements Serializable {
assertEquals(expected_unique, rel.distinct().count());
}
-// @AfterAll
-// public static void finalCleanUp() throws IOException {
-// FileUtils.deleteDirectory(new File(testOutputBasePath));
-// FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
-// }
+ @AfterAll
+ public static void finalCleanUp() throws IOException {
+ FileUtils.deleteDirectory(new File(testOutputBasePath));
+ FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
+ }
public boolean isDeletedByInference(String s) {
return s.contains("\"deletedbyinference\":true");
diff --git a/pom.xml b/pom.xml
index cec3dd75a0..f7300260cd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -315,7 +315,7 @@
eu.dnetlib
dnet-pace-core
- 4.0.4
+ 4.0.5
eu.dnetlib