diff --git a/.gitignore b/.gitignore
index 486eacee9..3f00d9729 100644
--- a/.gitignore
+++ b/.gitignore
@@ -18,4 +18,5 @@
/*/build
/build
spark-warehouse
-/dhp-workflows/dhp-graph-mapper/job-override.properties
+/*/*/job-override.properties
+
diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index 6fac06b68..43c2a3834 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -17,6 +17,10 @@
commons-cli
commons-cli
+
+ commons-io
+ commons-io
+
org.apache.commons
commons-lang3
@@ -29,21 +33,15 @@
javax.persistence
javax.persistence-api
-
com.fasterxml.jackson.core
jackson-databind
-
com.rabbitmq
amqp-client
-
- commons-io
- commons-io
-
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java
index a4970a928..cbfc5caf1 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java
@@ -2,17 +2,25 @@ package eu.dnetlib.dhp.application;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.cli.*;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.IOUtils;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.Serializable;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
+import java.io.StringWriter;
+import java.util.*;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import java.util.zip.Inflater;
public class ArgumentApplicationParser implements Serializable {
private final Options options = new Options();
private final Map objectMap = new HashMap<>();
+ private final List compressedValues = new ArrayList<>();
+
public ArgumentApplicationParser(final String json_configuration) throws Exception {
final ObjectMapper mapper = new ObjectMapper();
final OptionsParameter[] configuration = mapper.readValue(json_configuration, OptionsParameter[].class);
@@ -29,6 +37,9 @@ public class ArgumentApplicationParser implements Serializable {
final Option o = new Option(conf.getParamName(), true, conf.getParamDescription());
o.setLongOpt(conf.getParamLongName());
o.setRequired(conf.isParamRequired());
+ if (conf.isCompressed()) {
+ compressedValues.add(conf.getParamLongName());
+ }
return o;
}).forEach(options::addOption);
@@ -38,10 +49,32 @@ public class ArgumentApplicationParser implements Serializable {
}
+
+ public static String decompressValue(final String abstractCompressed) {
+ try {
+ byte[] byteArray = Base64.decodeBase64(abstractCompressed.getBytes());
+ GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream(byteArray));
+ final StringWriter stringWriter = new StringWriter();
+ IOUtils.copy(gis, stringWriter);
+ return stringWriter.toString();
+ } catch (Throwable e) {
+ System.out.println("Wrong value to decompress:" + abstractCompressed);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static String compressArgument(final String value) throws Exception{
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ GZIPOutputStream gzip = new GZIPOutputStream(out);
+ gzip.write(value.getBytes());
+ gzip.close();
+ return java.util.Base64.getEncoder().encodeToString(out.toByteArray());
+ }
+
public void parseArgument(final String[] args) throws Exception {
CommandLineParser parser = new BasicParser();
CommandLine cmd = parser.parse(options, args);
- Arrays.stream(cmd.getOptions()).forEach(it -> objectMap.put(it.getLongOpt(), it.getValue()));
+ Arrays.stream(cmd.getOptions()).forEach(it -> objectMap.put(it.getLongOpt(), compressedValues.contains(it.getLongOpt())? decompressValue(it.getValue()): it.getValue()));
}
public String get(final String key) {
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/application/OptionsParameter.java b/dhp-common/src/main/java/eu/dnetlib/dhp/application/OptionsParameter.java
index 92079fce7..4e7c2826b 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/application/OptionsParameter.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/application/OptionsParameter.java
@@ -7,6 +7,7 @@ public class OptionsParameter {
private String paramLongName;
private String paramDescription;
private boolean paramRequired;
+ private boolean compressed;
public OptionsParameter() {
}
@@ -26,4 +27,12 @@ public class OptionsParameter {
public boolean isParamRequired() {
return paramRequired;
}
+
+ public boolean isCompressed() {
+ return compressed;
+ }
+
+ public void setCompressed(boolean compressed) {
+ this.compressed = compressed;
+ }
}
diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/application/ArgumentApplicationParserTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/application/ArgumentApplicationParserTest.java
index 2033919b9..fdea3c2d4 100644
--- a/dhp-common/src/test/java/eu/dnetlib/dhp/application/ArgumentApplicationParserTest.java
+++ b/dhp-common/src/test/java/eu/dnetlib/dhp/application/ArgumentApplicationParserTest.java
@@ -3,6 +3,10 @@ package eu.dnetlib.dhp.application;
import org.apache.commons.io.IOUtils;
import org.junit.Test;
+import java.io.ByteArrayOutputStream;
+import java.util.Base64;
+import java.util.zip.GZIPOutputStream;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -24,6 +28,7 @@ public class ArgumentApplicationParserTest {
"-ro", "value7",
"-rr", "value8",
"-w", "value9",
+ "-cc", ArgumentApplicationParser.compressArgument(jsonConfiguration)
});
assertNotNull(parser.get("hdfsPath"));
assertNotNull(parser.get("apidescriptor"));
@@ -45,7 +50,12 @@ public class ArgumentApplicationParserTest {
assertEquals("value7", parser.get("rabbitOngoingQueue"));
assertEquals("value8", parser.get("rabbitReportQueue"));
assertEquals("value9", parser.get("workflowId"));
+ assertEquals(jsonConfiguration, parser.get("ccCoco"));
}
+
+
+
+
}
diff --git a/dhp-common/src/test/resources/eu/dnetlib/application/parameters.json b/dhp-common/src/test/resources/eu/dnetlib/application/parameters.json
index 60c2d391a..13c199166 100644
--- a/dhp-common/src/test/resources/eu/dnetlib/application/parameters.json
+++ b/dhp-common/src/test/resources/eu/dnetlib/application/parameters.json
@@ -1,12 +1,13 @@
[
- {"paramName":"p", "paramLongName":"hdfsPath", "paramDescription": "the path where storing the sequential file", "paramRequired": true},
- {"paramName":"a", "paramLongName":"apidescriptor", "paramDescription": "the JSON encoding of the API Descriptor", "paramRequired": true},
- {"paramName":"n", "paramLongName":"namenode", "paramDescription": "the Name Node URI", "paramRequired": true},
- {"paramName":"u", "paramLongName":"userHDFS", "paramDescription": "the user wich create the hdfs seq file", "paramRequired": true},
- {"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true},
- {"paramName":"rp", "paramLongName":"rabbitPassWord", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true},
- {"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true},
- {"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true},
- {"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true},
- {"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true}
+ {"paramName":"p", "paramLongName":"hdfsPath", "paramDescription": "the path where storing the sequential file", "paramRequired": true},
+ {"paramName":"a", "paramLongName":"apidescriptor", "paramDescription": "the JSON encoding of the API Descriptor", "paramRequired": true},
+ {"paramName":"n", "paramLongName":"namenode", "paramDescription": "the Name Node URI", "paramRequired": true},
+ {"paramName":"u", "paramLongName":"userHDFS", "paramDescription": "the user wich create the hdfs seq file", "paramRequired": true},
+ {"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true},
+ {"paramName":"rp", "paramLongName":"rabbitPassWord", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true},
+ {"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true},
+ {"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true},
+ {"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true},
+ {"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true},
+ {"paramName":"cc", "paramLongName":"ccCoco", "paramDescription": "the identifier of the dnet Workflow", "compressed":true,"paramRequired": true}
]
\ No newline at end of file
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java
index f27704c5c..dc5ac61e8 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java
@@ -84,4 +84,31 @@ public class Instance implements Serializable {
public void setDateofacceptance(Field dateofacceptance) {
this.dateofacceptance = dateofacceptance;
}
+ public String toComparableString(){
+ return String.format("%s::%s::%s::%s",
+ hostedby != null && hostedby.getKey()!= null ? hostedby.getKey().toLowerCase() : "",
+ accessright!= null && accessright.getClassid()!= null ? accessright.getClassid() : "",
+ instancetype!= null && instancetype.getClassid()!= null ? instancetype.getClassid() : "",
+ url != null ? url:"");
+ }
+
+ @Override
+ public int hashCode() {
+ return toComparableString().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+
+ Instance other = (Instance) obj;
+
+ return toComparableString()
+ .equals(other.toComparableString());
+ }
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
index 0e34d8ba6..1bb7f6a67 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
@@ -6,6 +6,8 @@ import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
public abstract class Result extends OafEntity implements Serializable {
@@ -251,13 +253,12 @@ public abstract class Result extends OafEntity implements Serializable {
Result r = (Result) e;
- mergeAuthors(r.getAuthor());
+
//TODO mergeFrom is used only for create Dedup Records since the creation of these two fields requires more complex functions (maybe they will be filled in an external function)
-// if (author == null)
-// author = r.getAuthor(); //authors will be replaced because they could be too much
// dateofacceptance = r.getDateofacceptance();
-// instance = mergeLists(instance, r.getInstance());
+
+ instance = mergeLists(instance, r.getInstance());
if (r.getResulttype() != null)
resulttype = r.getResulttype();
@@ -309,80 +310,5 @@ public abstract class Result extends OafEntity implements Serializable {
}
- public void mergeAuthors(List authors){
- int c1 = countAuthorsPids(author);
- int c2 = countAuthorsPids(authors);
- int s1 = authorsSize(author);
- int s2 = authorsSize(authors);
-
- //if both have no authors with pids and authors is bigger than author
- if (c1 == 0 && c2 == 0 && author.size() authors){
- if (authors == null)
- return -1;
-
- return (int) authors.stream().map(this::extractAuthorPid).filter(Objects::nonNull).filter(StringUtils::isNotBlank).count();
- }
-
- public int authorsSize(List authors){
- if (authors == null)
- return 0;
- return authors.size();
- }
-
- public String extractAuthorPid(Author a){
-
- if(a == null || a.getPid() == null || a.getPid().size() == 0)
- return null;
-
- StringBuilder mainPid = new StringBuilder();
-
- a.getPid().forEach(pid ->{
- if (pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) {
- mainPid.setLength(0);
- mainPid.append(pid.getValue());
- }
- else {
- if(mainPid.length() == 0)
- mainPid.append(pid.getValue());
- }
- });
-
- return mainPid.toString();
-
- }
}
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl
index 5f5ed5a3b..cef50aa95 100644
--- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl
+++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl
@@ -9,7 +9,7 @@
-
+
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/tr.xml b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/tr.xml
index ef6d9f7ac..a9eae8576 100644
--- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/tr.xml
+++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/tr.xml
@@ -1,11 +1,11 @@
@@ -24,7 +24,7 @@
-
+
diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DatePicker.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DatePicker.java
new file mode 100644
index 000000000..73f178edc
--- /dev/null
+++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DatePicker.java
@@ -0,0 +1,119 @@
+package eu.dnetlib.dedup;
+
+import eu.dnetlib.dhp.schema.oaf.Field;
+import org.apache.commons.lang.StringUtils;
+
+import java.time.Year;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.reverseOrder;
+import static java.util.Map.Entry.comparingByValue;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.commons.lang.StringUtils.endsWith;
+import static org.apache.commons.lang.StringUtils.substringBefore;
+
+public class DatePicker {
+
+ private static final String DATE_PATTERN = "\\d{4}-\\d{2}-\\d{2}";
+ private static final String DATE_DEFAULT_SUFFIX = "01-01";
+ private static final int YEAR_LB = 1300;
+ private static final int YEAR_UB = Year.now().getValue() + 5;
+
+ public static Field pick(final Collection dateofacceptance) {
+
+ final Map frequencies = dateofacceptance
+ .parallelStream()
+ .filter(StringUtils::isNotBlank)
+ .collect(
+ Collectors.toConcurrentMap(
+ w -> w, w -> 1, Integer::sum));
+
+ if (frequencies.isEmpty()) {
+ return new Field<>();
+ }
+
+ final Field date = new Field<>();
+ date.setValue(frequencies.keySet().iterator().next());
+
+ // let's sort this map by values first, filtering out invalid dates
+ final Map sorted = frequencies
+ .entrySet()
+ .stream()
+ .filter(d -> StringUtils.isNotBlank(d.getKey()))
+ .filter(d -> d.getKey().matches(DATE_PATTERN))
+ .filter(d -> inRange(d.getKey()))
+ .sorted(reverseOrder(comparingByValue()))
+ .collect(
+ toMap(
+ Map.Entry::getKey,
+ Map.Entry::getValue, (e1, e2) -> e2,
+ LinkedHashMap::new));
+
+ // shortcut
+ if (sorted.size() == 0) {
+ return date;
+ }
+
+ // voting method (1/3 + 1) wins
+ if (sorted.size() >= 3) {
+ final int acceptThreshold = (sorted.size() / 3) + 1;
+ final List accepted = sorted.entrySet().stream()
+ .filter(e -> e.getValue() >= acceptThreshold)
+ .map(e -> e.getKey())
+ .collect(Collectors.toList());
+
+ // cannot find strong majority
+ if (accepted.isEmpty()) {
+ final int max = sorted.values().iterator().next();
+ Optional first = sorted.entrySet().stream()
+ .filter(e -> e.getValue() == max && !endsWith(e.getKey(), DATE_DEFAULT_SUFFIX))
+ .map(Map.Entry::getKey)
+ .findFirst();
+ if (first.isPresent()) {
+ date.setValue(first.get());
+ return date;
+ }
+
+ date.setValue(sorted.keySet().iterator().next());
+ return date;
+ }
+
+ if (accepted.size() == 1) {
+ date.setValue(accepted.get(0));
+ return date;
+ } else {
+ final Optional first = accepted.stream()
+ .filter(d -> !endsWith(d, DATE_DEFAULT_SUFFIX))
+ .findFirst();
+ if (first.isPresent()) {
+ date.setValue(first.get());
+ return date;
+ }
+
+ return date;
+ }
+
+ //1st non YYYY-01-01 is returned
+ } else {
+ if (sorted.size() == 2) {
+ for (Map.Entry e : sorted.entrySet()) {
+ if (!endsWith(e.getKey(), DATE_DEFAULT_SUFFIX)) {
+ date.setValue(e.getKey());
+ return date;
+ }
+ }
+ }
+
+ // none of the dates seems good enough, return the 1st one
+ date.setValue(sorted.keySet().iterator().next());
+ return date;
+ }
+ }
+
+ private static boolean inRange(final String date) {
+ final int year = Integer.parseInt(substringBefore(date, "-"));
+ return year >= YEAR_LB && year <= YEAR_UB;
+ }
+
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java
index 371e80349..000be640a 100644
--- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java
+++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java
@@ -1,6 +1,5 @@
package eu.dnetlib.dedup;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.pace.config.DedupConfig;
@@ -10,23 +9,20 @@ import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.codehaus.jackson.map.ObjectMapper;
import scala.Tuple2;
-import java.io.IOException;
import java.util.Collection;
-import java.util.List;
import java.util.Random;
import static java.util.stream.Collectors.toMap;
public class DedupRecordFactory {
- public JavaRDD createDedupRecord(final JavaSparkContext sc, final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final OafEntityType entityType, final DedupConfig dedupConf){
+ public static JavaRDD createDedupRecord(final JavaSparkContext sc, final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final OafEntityType entityType, final DedupConfig dedupConf){
//
final JavaPairRDD inputJsonEntities = sc.textFile(entitiesInputPath)
@@ -51,7 +47,12 @@ public class DedupRecordFactory {
String idValue = json._1();
- String trust = MapDocumentUtil.getJPathString("$.dataInfo.trust", json._2());
+ String trust ="";
+ try {
+ trust = MapDocumentUtil.getJPathString("$.dataInfo.trust", json._2());
+ } catch (Throwable e) {
+
+ }
//TODO remember to replace this with the actual trust retrieving
if (StringUtils.isBlank(trust)) {
@@ -71,28 +72,32 @@ public class DedupRecordFactory {
.groupByKey();
+
+
+
+
switch(entityType){
- case Publication:
- return sortedJoinResult.map(this::publicationMerger);
- case Dataset:
- return sortedJoinResult.map(this::datasetMerger);
- case Project:
- return sortedJoinResult.map(this::projectMerger);
- case Software:
- return sortedJoinResult.map(this::softwareMerger);
- case Datasource:
- return sortedJoinResult.map(this::datasourceMerger);
- case Organization:
- return sortedJoinResult.map(this::organizationMerger);
- case OtherResearchProduct:
- return sortedJoinResult.map(this::otherresearchproductMerger);
+ case publication:
+ return sortedJoinResult.map(DedupRecordFactory::publicationMerger);
+ case dataset:
+ return sortedJoinResult.map(DedupRecordFactory::datasetMerger);
+ case project:
+ return sortedJoinResult.map(DedupRecordFactory::projectMerger);
+ case software:
+ return sortedJoinResult.map(DedupRecordFactory::softwareMerger);
+ case datasource:
+ return sortedJoinResult.map(DedupRecordFactory::datasourceMerger);
+ case organization:
+ return sortedJoinResult.map(DedupRecordFactory::organizationMerger);
+ case otherresearchproduct:
+ return sortedJoinResult.map(DedupRecordFactory::otherresearchproductMerger);
default:
return null;
}
}
- private Publication publicationMerger(Tuple2> e){
+ private static Publication publicationMerger(Tuple2> e){
Publication p = new Publication(); //the result of the merge, to be returned at the end
@@ -101,67 +106,59 @@ public class DedupRecordFactory {
final ObjectMapper mapper = new ObjectMapper();
final Collection dateofacceptance = Lists.newArrayList();
- final Collection> authors = Lists.newArrayList();
- final Collection> instances = Lists.newArrayList();
+
StringBuilder trust = new StringBuilder("0.0");
+ if (e._2() != null)
e._2().forEach(pub -> {
try {
Publication publication = mapper.readValue(pub, Publication.class);
final String currentTrust = publication.getDataInfo().getTrust();
- if (!currentTrust.equals("1.0")) {
+ if (!"1.0".equals(currentTrust)) {
trust.setLength(0);
trust.append(currentTrust);
}
-
p.mergeFrom(publication);
-
+ p.setAuthor(DedupUtility.mergeAuthor(p.getAuthor(), publication.getAuthor()));
//add to the list if they are not null
if (publication.getDateofacceptance() != null)
dateofacceptance.add(publication.getDateofacceptance().getValue());
- if (publication.getAuthor() != null)
- authors.add(publication.getAuthor());
- if (publication.getInstance() != null)
- instances.add(publication.getInstance());
-
- } catch (Exception exc){}
-
+ } catch (Exception exc){
+ throw new RuntimeException(exc);
+ }
});
-
- p.setAuthor(null); //TODO create a single list of authors to put in the final publication
-
-
+ p.setDateofacceptance(DatePicker.pick(dateofacceptance));
return p;
}
- private Dataset datasetMerger(Tuple2> e){
+ private static Dataset datasetMerger(Tuple2> e){
throw new NotImplementedException();
}
- private Project projectMerger(Tuple2> e){
+ private static Project projectMerger(Tuple2> e){
throw new NotImplementedException();
}
- private Software softwareMerger(Tuple2> e){
+ private static Software softwareMerger(Tuple2> e){
throw new NotImplementedException();
}
- private Datasource datasourceMerger(Tuple2> e){
+ private static Datasource datasourceMerger(Tuple2> e){
throw new NotImplementedException();
}
- private Organization organizationMerger(Tuple2> e){
+ private static Organization organizationMerger(Tuple2> e){
throw new NotImplementedException();
}
- private OtherResearchProduct otherresearchproductMerger(Tuple2> e){
+ private static OtherResearchProduct otherresearchproductMerger(Tuple2> e){
throw new NotImplementedException();
}
diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java
index b65e866f1..388ab9b69 100644
--- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java
+++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java
@@ -1,11 +1,17 @@
package eu.dnetlib.dedup;
import com.google.common.collect.Sets;
+import com.wcohen.ss.JaroWinkler;
+import eu.dnetlib.dhp.schema.oaf.Author;
+import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner;
import eu.dnetlib.pace.config.DedupConfig;
+
import eu.dnetlib.pace.model.MapDocument;
+import eu.dnetlib.pace.model.Person;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -14,32 +20,35 @@ import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.util.LongAccumulator;
+import scala.Tuple2;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
+import java.text.Normalizer;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class DedupUtility {
+ private static final Double THRESHOLD = 0.95;
public static Map constructAccumulator(final DedupConfig dedupConf, final SparkContext context) {
Map accumulators = new HashMap<>();
- String acc1 = String.format("%s::%s",dedupConf.getWf().getEntityType(), "records per hash key = 1");
+ String acc1 = String.format("%s::%s", dedupConf.getWf().getEntityType(), "records per hash key = 1");
accumulators.put(acc1, context.longAccumulator(acc1));
- String acc2 = String.format("%s::%s",dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField());
+ String acc2 = String.format("%s::%s", dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField());
accumulators.put(acc2, context.longAccumulator(acc2));
- String acc3 = String.format("%s::%s",dedupConf.getWf().getEntityType(), String.format("Skipped records for count(%s) >= %s", dedupConf.getWf().getOrderField(), dedupConf.getWf().getGroupMaxSize()));
+ String acc3 = String.format("%s::%s", dedupConf.getWf().getEntityType(), String.format("Skipped records for count(%s) >= %s", dedupConf.getWf().getOrderField(), dedupConf.getWf().getGroupMaxSize()));
accumulators.put(acc3, context.longAccumulator(acc3));
- String acc4 = String.format("%s::%s",dedupConf.getWf().getEntityType(), "skip list");
+ String acc4 = String.format("%s::%s", dedupConf.getWf().getEntityType(), "skip list");
accumulators.put(acc4, context.longAccumulator(acc4));
- String acc5 = String.format("%s::%s",dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)");
+ String acc5 = String.format("%s::%s", dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)");
accumulators.put(acc5, context.longAccumulator(acc5));
- String acc6 = String.format("%s::%s",dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold());
+ String acc6 = String.format("%s::%s", dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold());
accumulators.put(acc6, context.longAccumulator(acc6));
return accumulators;
@@ -52,7 +61,7 @@ public class DedupUtility {
public static void deleteIfExists(String path) throws IOException {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(conf);
- if (fileSystem.exists(new Path(path))){
+ if (fileSystem.exists(new Path(path))) {
fileSystem.delete(new Path(path), true);
}
}
@@ -91,4 +100,171 @@ public class DedupUtility {
return null;
}
}
+
+
+ public static List mergeAuthor(final List a, final List b) {
+ int pa = countAuthorsPids(a);
+ int pb = countAuthorsPids(b);
+ List base, enrich;
+ int sa = authorsSize(a);
+ int sb = authorsSize(b);
+
+ if(pa == pb){
+ base = sa>sb?a:b;
+ enrich = sa>sb?b:a;
+ } else {
+ base = pa>pb?a:b;
+ enrich = pa>pb?b:a;
+ }
+ enrichPidFromList(base, enrich);
+ return base;
+
+
+
+// //if both have no authors with pids
+// if (pa < 1 && pb < 1) {
+// //B is bigger than A
+// if (sa < sb)
+// return b;
+// //A is bigger than B
+// else
+// return a;
+// }
+// //If A has author with pids
+// if (pa > 0) {
+// //B has no author with pid
+// if (pb < 1)
+// return a;
+// //B has author with pid
+// else {
+// enrichPidFromList(a, b);
+// return a;
+// }
+// }
+// //If B has author with pids
+// //A has no author with pid
+// if (pa < 1)
+// return b;
+// //A has author with pid
+// else {
+// enrichPidFromList(b, a);
+// return b;
+// }
+ }
+
+ private static void enrichPidFromList(List base, List enrich) {
+ if(base==null || enrich == null)
+ return;
+ final Map basePidAuthorMap = base.stream()
+ .filter(a -> a.getPid() != null && a.getPid().size() > 0)
+ .flatMap(a -> a.getPid()
+ .stream()
+ .map(p -> new Tuple2<>(p.toComparableString(), a))
+ ).collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
+
+ final List> pidToEnrich = enrich
+ .stream()
+ .filter(a -> a.getPid() != null && a.getPid().size() > 0)
+ .flatMap(a -> a.getPid().stream().filter(p -> !basePidAuthorMap.containsKey(p.toComparableString())).map(p -> new Tuple2<>(p, a)))
+ .collect(Collectors.toList());
+
+
+ pidToEnrich.forEach(a -> {
+ Optional> simAuhtor = base.stream().map(ba -> new Tuple2<>(sim(ba, a._2()), ba)).max(Comparator.comparing(Tuple2::_1));
+ if (simAuhtor.isPresent() && simAuhtor.get()._1()> THRESHOLD) {
+ Author r = simAuhtor.get()._2();
+ r.getPid().add(a._1());
+ }
+ });
+ }
+
+ public static String createEntityPath(final String basePath, final String entityType) {
+ return String.format("%s/%s", basePath,entityType);
+ }
+
+ public static String createSimRelPath(final String basePath, final String entityType) {
+ return String.format("%s/%s_simRel", basePath,entityType);
+ }
+
+ public static String createMergeRelPath(final String basePath, final String entityType) {
+ return String.format("%s/%s_mergeRel", basePath,entityType);
+ }
+
+ private static Double sim(Author a, Author b) {
+
+ final Person pa = parse(a);
+ final Person pb = parse(b);
+
+ if (pa.isAccurate() & pb.isAccurate()) {
+ return new JaroWinkler().score(
+ normalize(pa.getSurnameString()),
+ normalize(pb.getSurnameString()));
+ } else {
+ return new JaroWinkler().score(
+ normalize(pa.getNormalisedFullname()),
+ normalize(pb.getNormalisedFullname()));
+ }
+ }
+
+ private static String normalize(final String s) {
+ return nfd(s).toLowerCase()
+ // do not compact the regexes in a single expression, would cause StackOverflowError in case of large input strings
+ .replaceAll("(\\W)+", " ")
+ .replaceAll("(\\p{InCombiningDiacriticalMarks})+", " ")
+ .replaceAll("(\\p{Punct})+", " ")
+ .replaceAll("(\\d)+", " ")
+ .replaceAll("(\\n)+", " ")
+ .trim();
+ }
+
+ private static String nfd(final String s) {
+ return Normalizer.normalize(s, Normalizer.Form.NFD);
+ }
+ private static Person parse(Author author) {
+ if (StringUtils.isNotBlank(author.getSurname())) {
+ return new Person(author.getSurname() + ", " + author.getName(), false);
+ } else {
+ return new Person(author.getFullname(), false);
+ }
+ }
+
+
+ private static int countAuthorsPids(List authors) {
+ if (authors == null)
+ return 0;
+
+ return (int) authors.stream().map(DedupUtility::extractAuthorPid).filter(Objects::nonNull).filter(StringUtils::isNotBlank).count();
+ }
+
+ private static int authorsSize(List authors) {
+ if (authors == null)
+ return 0;
+ return authors.size();
+ }
+
+
+ private static boolean isAccurate(final Author a) {
+ return StringUtils.isNotBlank(a.getName()) && StringUtils.isNotBlank(a.getSurname());
+ }
+
+ private static String extractAuthorPid(Author a) {
+
+ if (a == null || a.getPid() == null || a.getPid().size() == 0)
+ return null;
+
+ StringBuilder mainPid = new StringBuilder();
+
+ a.getPid().forEach(pid -> {
+ if (pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) {
+ mainPid.setLength(0);
+ mainPid.append(pid.getValue());
+ } else {
+ if (mainPid.length() == 0)
+ mainPid.append(pid.getValue());
+ }
+ });
+
+ return mainPid.toString();
+
+ }
}
diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/OafEntityType.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/OafEntityType.java
index 4ff2fa873..fb347ed51 100644
--- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/OafEntityType.java
+++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/OafEntityType.java
@@ -2,12 +2,14 @@ package eu.dnetlib.dedup;
public enum OafEntityType {
- Datasource,
- Organization,
- Project,
- Dataset,
- OtherResearchProduct,
- Software,
- Publication
+ datasource,
+ organization,
+ project,
+ dataset,
+ otherresearchproduct,
+ software,
+ publication
+
+
}
diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java
index eacf3d479..9783e93d6 100644
--- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java
+++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java
@@ -46,7 +46,7 @@ public class SparkCreateConnectedComponent {
s -> new Tuple2