diff --git a/.gitignore b/.gitignore
index 486eacee9..3f00d9729 100644
--- a/.gitignore
+++ b/.gitignore
@@ -18,4 +18,5 @@
/*/build
/build
spark-warehouse
-/dhp-workflows/dhp-graph-mapper/job-override.properties
+/*/*/job-override.properties
+
diff --git a/dhp-build/dhp-build-assembly-resources/pom.xml b/dhp-build/dhp-build-assembly-resources/pom.xml
index f1dd5f631..834af77fa 100644
--- a/dhp-build/dhp-build-assembly-resources/pom.xml
+++ b/dhp-build/dhp-build-assembly-resources/pom.xml
@@ -17,6 +17,7 @@
org.apache.maven.plugins
maven-compiler-plugin
+ ${maven.compiler.plugin.version}
diff --git a/dhp-build/dhp-build-properties-maven-plugin/pom.xml b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
index d561ce5dc..4f99d5298 100644
--- a/dhp-build/dhp-build-properties-maven-plugin/pom.xml
+++ b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
@@ -53,6 +53,7 @@
org.apache.maven.plugins
maven-compiler-plugin
+ ${maven.compiler.plugin.version}
org.apache.maven.plugins
diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index 6fac06b68..43c2a3834 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -17,6 +17,10 @@
commons-cli
commons-cli
+
+ commons-io
+ commons-io
+
org.apache.commons
commons-lang3
@@ -29,21 +33,15 @@
javax.persistence
javax.persistence-api
-
com.fasterxml.jackson.core
jackson-databind
-
com.rabbitmq
amqp-client
-
- commons-io
- commons-io
-
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java
index a4970a928..cbfc5caf1 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java
@@ -2,17 +2,25 @@ package eu.dnetlib.dhp.application;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.cli.*;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.IOUtils;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.Serializable;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
+import java.io.StringWriter;
+import java.util.*;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import java.util.zip.Inflater;
public class ArgumentApplicationParser implements Serializable {
private final Options options = new Options();
private final Map objectMap = new HashMap<>();
+ private final List compressedValues = new ArrayList<>();
+
public ArgumentApplicationParser(final String json_configuration) throws Exception {
final ObjectMapper mapper = new ObjectMapper();
final OptionsParameter[] configuration = mapper.readValue(json_configuration, OptionsParameter[].class);
@@ -29,6 +37,9 @@ public class ArgumentApplicationParser implements Serializable {
final Option o = new Option(conf.getParamName(), true, conf.getParamDescription());
o.setLongOpt(conf.getParamLongName());
o.setRequired(conf.isParamRequired());
+ if (conf.isCompressed()) {
+ compressedValues.add(conf.getParamLongName());
+ }
return o;
}).forEach(options::addOption);
@@ -38,10 +49,32 @@ public class ArgumentApplicationParser implements Serializable {
}
+
+ public static String decompressValue(final String abstractCompressed) {
+ try {
+ byte[] byteArray = Base64.decodeBase64(abstractCompressed.getBytes());
+ GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream(byteArray));
+ final StringWriter stringWriter = new StringWriter();
+ IOUtils.copy(gis, stringWriter);
+ return stringWriter.toString();
+ } catch (Throwable e) {
+ System.out.println("Wrong value to decompress:" + abstractCompressed);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static String compressArgument(final String value) throws Exception{
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ GZIPOutputStream gzip = new GZIPOutputStream(out);
+ gzip.write(value.getBytes());
+ gzip.close();
+ return java.util.Base64.getEncoder().encodeToString(out.toByteArray());
+ }
+
public void parseArgument(final String[] args) throws Exception {
CommandLineParser parser = new BasicParser();
CommandLine cmd = parser.parse(options, args);
- Arrays.stream(cmd.getOptions()).forEach(it -> objectMap.put(it.getLongOpt(), it.getValue()));
+ Arrays.stream(cmd.getOptions()).forEach(it -> objectMap.put(it.getLongOpt(), compressedValues.contains(it.getLongOpt())? decompressValue(it.getValue()): it.getValue()));
}
public String get(final String key) {
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/application/OptionsParameter.java b/dhp-common/src/main/java/eu/dnetlib/dhp/application/OptionsParameter.java
index 92079fce7..4e7c2826b 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/application/OptionsParameter.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/application/OptionsParameter.java
@@ -7,6 +7,7 @@ public class OptionsParameter {
private String paramLongName;
private String paramDescription;
private boolean paramRequired;
+ private boolean compressed;
public OptionsParameter() {
}
@@ -26,4 +27,12 @@ public class OptionsParameter {
public boolean isParamRequired() {
return paramRequired;
}
+
+ public boolean isCompressed() {
+ return compressed;
+ }
+
+ public void setCompressed(boolean compressed) {
+ this.compressed = compressed;
+ }
}
diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/application/ArgumentApplicationParserTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/application/ArgumentApplicationParserTest.java
index 2033919b9..fdea3c2d4 100644
--- a/dhp-common/src/test/java/eu/dnetlib/dhp/application/ArgumentApplicationParserTest.java
+++ b/dhp-common/src/test/java/eu/dnetlib/dhp/application/ArgumentApplicationParserTest.java
@@ -3,6 +3,10 @@ package eu.dnetlib.dhp.application;
import org.apache.commons.io.IOUtils;
import org.junit.Test;
+import java.io.ByteArrayOutputStream;
+import java.util.Base64;
+import java.util.zip.GZIPOutputStream;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -24,6 +28,7 @@ public class ArgumentApplicationParserTest {
"-ro", "value7",
"-rr", "value8",
"-w", "value9",
+ "-cc", ArgumentApplicationParser.compressArgument(jsonConfiguration)
});
assertNotNull(parser.get("hdfsPath"));
assertNotNull(parser.get("apidescriptor"));
@@ -45,7 +50,12 @@ public class ArgumentApplicationParserTest {
assertEquals("value7", parser.get("rabbitOngoingQueue"));
assertEquals("value8", parser.get("rabbitReportQueue"));
assertEquals("value9", parser.get("workflowId"));
+ assertEquals(jsonConfiguration, parser.get("ccCoco"));
}
+
+
+
+
}
diff --git a/dhp-common/src/test/resources/eu/dnetlib/application/parameters.json b/dhp-common/src/test/resources/eu/dnetlib/application/parameters.json
index 60c2d391a..13c199166 100644
--- a/dhp-common/src/test/resources/eu/dnetlib/application/parameters.json
+++ b/dhp-common/src/test/resources/eu/dnetlib/application/parameters.json
@@ -1,12 +1,13 @@
[
- {"paramName":"p", "paramLongName":"hdfsPath", "paramDescription": "the path where storing the sequential file", "paramRequired": true},
- {"paramName":"a", "paramLongName":"apidescriptor", "paramDescription": "the JSON encoding of the API Descriptor", "paramRequired": true},
- {"paramName":"n", "paramLongName":"namenode", "paramDescription": "the Name Node URI", "paramRequired": true},
- {"paramName":"u", "paramLongName":"userHDFS", "paramDescription": "the user wich create the hdfs seq file", "paramRequired": true},
- {"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true},
- {"paramName":"rp", "paramLongName":"rabbitPassWord", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true},
- {"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true},
- {"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true},
- {"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true},
- {"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true}
+ {"paramName":"p", "paramLongName":"hdfsPath", "paramDescription": "the path where storing the sequential file", "paramRequired": true},
+ {"paramName":"a", "paramLongName":"apidescriptor", "paramDescription": "the JSON encoding of the API Descriptor", "paramRequired": true},
+ {"paramName":"n", "paramLongName":"namenode", "paramDescription": "the Name Node URI", "paramRequired": true},
+ {"paramName":"u", "paramLongName":"userHDFS", "paramDescription": "the user wich create the hdfs seq file", "paramRequired": true},
+ {"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true},
+ {"paramName":"rp", "paramLongName":"rabbitPassWord", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true},
+ {"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true},
+ {"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true},
+ {"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true},
+ {"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true},
+ {"paramName":"cc", "paramLongName":"ccCoco", "paramDescription": "the identifier of the dnet Workflow", "compressed":true,"paramRequired": true}
]
\ No newline at end of file
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Context.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Context.java
index 64e23088e..809200463 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Context.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Context.java
@@ -23,4 +23,23 @@ public class Context implements Serializable {
public void setDataInfo(List dataInfo) {
this.dataInfo = dataInfo;
}
+
+ @Override
+ public int hashCode() {
+ return id ==null? 0 : id.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+
+ Context other = (Context) obj;
+
+ return id.equals(other.getId());
+ }
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java
index 44d5226e9..27bee998e 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java
@@ -19,7 +19,7 @@ public class Dataset extends Result implements Serializable {
private List geolocation;
- public Field getStoragedate() {
+ public Field getStoragedate() {
return storagedate;
}
@@ -74,4 +74,26 @@ public class Dataset extends Result implements Serializable {
public void setGeolocation(List geolocation) {
this.geolocation = geolocation;
}
+
+ @Override
+ public void mergeFrom(OafEntity e) {
+ super.mergeFrom(e);
+ final Dataset d = (Dataset) e;
+
+ storagedate = d.getStoragedate() != null && compareTrust(this, e)<0? d.getStoragedate() : storagedate;
+
+ device= d.getDevice() != null && compareTrust(this, e)<0? d.getDevice() : device;
+
+ size= d.getSize() != null && compareTrust(this, e)<0? d.getSize() : size;
+
+ version= d.getVersion() != null && compareTrust(this, e)<0? d.getVersion() : version;
+
+ lastmetadataupdate= d.getLastmetadataupdate() != null && compareTrust(this, e)<0? d.getLastmetadataupdate() :lastmetadataupdate;
+
+ metadataversionnumber= d.getMetadataversionnumber() != null && compareTrust(this, e)<0? d.getMetadataversionnumber() : metadataversionnumber;
+
+ geolocation = mergeLists(geolocation, d.getGeolocation());
+
+ mergeOAFDataInfo(d);
+ }
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Datasource.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Datasource.java
index 21408a5ec..f52a500fe 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Datasource.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Datasource.java
@@ -78,7 +78,7 @@ public class Datasource extends OafEntity implements Serializable {
private Field certificates;
- private List< KeyValue> policies;
+ private List policies;
private Journal journal;
@@ -361,4 +361,67 @@ public class Datasource extends OafEntity implements Serializable {
public void setJournal(Journal journal) {
this.journal = journal;
}
+
+ @Override
+ public void mergeFrom(OafEntity e) {
+ super.mergeFrom(e);
+
+ Datasource d = (Datasource)e;
+
+ datasourcetype = d.getDatasourcetype() != null && compareTrust(this, e)<0? d.getDatasourcetype() : datasourcetype;
+ openairecompatibility = d.getOpenairecompatibility() != null && compareTrust(this, e)<0? d.getOpenairecompatibility() : openairecompatibility;
+ officialname = d.getOfficialname() != null && compareTrust(this, e)<0? d.getOfficialname() : officialname;
+ englishname = d.getEnglishname() != null && compareTrust(this, e)<0? d.getEnglishname() : officialname;
+ websiteurl = d.getWebsiteurl() != null && compareTrust(this, e)<0? d.getWebsiteurl() : websiteurl;
+ logourl = d.getLogourl() != null && compareTrust(this, e)<0? d.getLogourl() : getLogourl();
+ contactemail = d.getContactemail() != null && compareTrust(this, e)<0? d.getContactemail() : contactemail;
+ namespaceprefix = d.getNamespaceprefix() != null && compareTrust(this, e)<0? d.getNamespaceprefix() : namespaceprefix;
+ latitude = d.getLatitude() != null && compareTrust(this, e)<0? d.getLatitude() : latitude;
+ longitude = d.getLongitude() != null && compareTrust(this, e)<0? d.getLongitude() : longitude;
+ dateofvalidation = d.getDateofvalidation() != null && compareTrust(this, e)<0? d.getDateofvalidation() : dateofvalidation;
+ description = d.getDescription() != null && compareTrust(this, e)<0? d.getDescription() : description;
+ subjects = mergeLists(subjects, d.getSubjects());
+
+ // opendoar specific fields (od*)
+ odnumberofitems = d.getOdnumberofitems() != null && compareTrust(this, e)<0? d.getOdnumberofitems() : odnumberofitems;
+ odnumberofitemsdate = d.getOdnumberofitemsdate() != null && compareTrust(this, e)<0? d.getOdnumberofitemsdate() : odnumberofitemsdate;
+ odpolicies = d.getOdpolicies() != null && compareTrust(this, e)<0? d.getOdpolicies() : odpolicies;
+ odlanguages = mergeLists(odlanguages, d.getOdlanguages());
+ odcontenttypes = mergeLists(odcontenttypes, d.getOdcontenttypes());
+ accessinfopackage = mergeLists(accessinfopackage, d.getAccessinfopackage());
+
+ // re3data fields
+ releasestartdate = d.getReleasestartdate() != null && compareTrust(this, e)<0? d.getReleasestartdate() : releasestartdate;
+ releaseenddate = d.getReleaseenddate() != null && compareTrust(this, e)<0? d.getReleaseenddate() : releaseenddate;
+ missionstatementurl = d.getMissionstatementurl() != null && compareTrust(this, e)<0? d.getMissionstatementurl() : missionstatementurl;
+ dataprovider = d.getDataprovider() != null && compareTrust(this, e)<0? d.getDataprovider() : dataprovider;
+ serviceprovider = d.getServiceprovider() != null && compareTrust(this, e)<0? d.getServiceprovider() : serviceprovider;
+
+ // {open, restricted or closed}
+ databaseaccesstype = d.getDatabaseaccesstype() != null && compareTrust(this, e)<0? d.getDatabaseaccesstype() : databaseaccesstype;
+
+ // {open, restricted or closed}
+ datauploadtype = d.getDatauploadtype() != null && compareTrust(this, e)<0? d.getDatauploadtype() : datauploadtype;
+
+ // {feeRequired, registration, other}
+ databaseaccessrestriction = d.getDatabaseaccessrestriction() != null && compareTrust(this, e)<0? d.getDatabaseaccessrestriction() : databaseaccessrestriction;
+
+ // {feeRequired, registration, other}
+ datauploadrestriction = d.getDatauploadrestriction() != null && compareTrust(this, e)<0? d.getDatauploadrestriction() : datauploadrestriction;
+
+ versioning = d.getVersioning() != null && compareTrust(this, e)<0? d.getVersioning() : versioning;
+ citationguidelineurl = d.getCitationguidelineurl() != null && compareTrust(this, e)<0? d.getCitationguidelineurl() : citationguidelineurl;
+
+ //{yes, no, unknown}
+ qualitymanagementkind = d.getQualitymanagementkind() != null && compareTrust(this, e)<0? d.getQualitymanagementkind() : qualitymanagementkind;
+ pidsystems = d.getPidsystems() != null && compareTrust(this, e)<0? d.getPidsystems() : pidsystems;
+
+ certificates = d.getCertificates() != null && compareTrust(this, e)<0? d.getCertificates() : certificates;
+
+ policies = mergeLists(policies, d.getPolicies());
+
+ journal = d.getJournal() != null && compareTrust(this, e)<0? d.getJournal() : journal;
+
+ mergeOAFDataInfo(e);
+ }
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Field.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Field.java
index 8834900c9..2ab0b4d3c 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Field.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Field.java
@@ -23,4 +23,21 @@ public class Field implements Serializable {
public void setDataInfo(DataInfo dataInfo) {
this.dataInfo = dataInfo;
}
+
+ @Override
+ public int hashCode() {
+ return getValue() == null ? 0 : getValue().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Field other = (Field) obj;
+ return getValue().equals(other.getValue());
+ }
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/GeoLocation.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/GeoLocation.java
index 567254a23..43af60286 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/GeoLocation.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/GeoLocation.java
@@ -1,5 +1,7 @@
package eu.dnetlib.dhp.schema.oaf;
+import org.apache.commons.lang3.StringUtils;
+
import java.io.Serializable;
public class GeoLocation implements Serializable {
@@ -33,4 +35,35 @@ public class GeoLocation implements Serializable {
public void setPlace(String place) {
this.place = place;
}
+
+
+ public boolean isBlank() {
+ return StringUtils.isBlank(point) &&
+ StringUtils.isBlank(box) &&
+ StringUtils.isBlank(place);
+ }
+
+ public String toComparableString() {
+ return isBlank()?"":String.format("%s::%s%s", point != null ? point.toLowerCase() : "", box != null ? box.toLowerCase() : "", place != null ? place.toLowerCase() : "");
+ }
+
+ @Override
+ public int hashCode() {
+ return toComparableString().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+
+ GeoLocation other = (GeoLocation) obj;
+
+ return toComparableString()
+ .equals(other.toComparableString());
+ }
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java
index 8726b85ce..8f852af65 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java
@@ -85,4 +85,34 @@ public class Instance implements Serializable {
public void setDateofacceptance(Field dateofacceptance) {
this.dateofacceptance = dateofacceptance;
}
+
+
+
+ public String toComparableString(){
+ return String.format("%s::%s::%s::%s",
+ hostedby != null && hostedby.getKey()!= null ? hostedby.getKey().toLowerCase() : "",
+ accessright!= null && accessright.getClassid()!= null ? accessright.getClassid() : "",
+ instancetype!= null && instancetype.getClassid()!= null ? instancetype.getClassid() : "",
+ url != null ? url:"");
+ }
+
+ @Override
+ public int hashCode() {
+ return toComparableString().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+
+ Instance other = (Instance) obj;
+
+ return toComparableString()
+ .equals(other.toComparableString());
+ }
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/KeyValue.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/KeyValue.java
index 1c4c7e6ef..74d9f77bd 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/KeyValue.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/KeyValue.java
@@ -1,5 +1,7 @@
package eu.dnetlib.dhp.schema.oaf;
+import org.apache.commons.lang3.StringUtils;
+
import java.io.Serializable;
public class KeyValue implements Serializable {
@@ -33,4 +35,31 @@ public class KeyValue implements Serializable {
public void setDataInfo(DataInfo dataInfo) {
this.dataInfo = dataInfo;
}
+
+ public String toComparableString() {
+ return isBlank()?"":String.format("%s::%s", key != null ? key.toLowerCase() : "", value != null ? value.toLowerCase() : "");
+ }
+
+ public boolean isBlank() {
+ return StringUtils.isBlank(key) && StringUtils.isBlank(value);
+ }
+
+ @Override
+ public int hashCode() {
+ return toComparableString().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+
+ KeyValue other = (KeyValue) obj;
+
+ return toComparableString().equals(other.toComparableString());
+ }
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Oaf.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Oaf.java
index 010633ec3..cc2ab8428 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Oaf.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Oaf.java
@@ -1,8 +1,5 @@
package eu.dnetlib.dhp.schema.oaf;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
import java.io.Serializable;
public abstract class Oaf implements Serializable {
@@ -27,13 +24,23 @@ public abstract class Oaf implements Serializable {
this.lastupdatetimestamp = lastupdatetimestamp;
}
- @Override
- public String toString() {
- try {
- return new ObjectMapper().writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
+
+ public void mergeOAFDataInfo(Oaf e) {
+ if (e.getDataInfo()!= null && compareTrust(this,e)<0)
+ dataInfo = e.getDataInfo();
}
+ protected String extractTrust(Oaf e) {
+ if (e == null || e.getDataInfo()== null || e.getDataInfo().getTrust()== null)
+ return "0.0";
+ return e.getDataInfo().getTrust();
+
+
+
+ }
+
+ protected int compareTrust(Oaf a, Oaf b) {
+ return extractTrust(a).compareTo(extractTrust(b));
+
+ }
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java
index 791667b46..8a86f822d 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java
@@ -1,7 +1,8 @@
package eu.dnetlib.dhp.schema.oaf;
import java.io.Serializable;
-import java.util.List;
+import java.util.*;
+import java.util.stream.Collectors;
public abstract class OafEntity extends Oaf implements Serializable {
@@ -84,4 +85,36 @@ public abstract class OafEntity extends Oaf implements Serializable {
public void setOaiprovenance(OAIProvenance oaiprovenance) {
this.oaiprovenance = oaiprovenance;
}
+
+
+ public void mergeFrom(OafEntity e) {
+
+ if (e == null)
+ return;
+
+ originalId = mergeLists(originalId, e.getOriginalId());
+
+ collectedfrom = mergeLists(collectedfrom, e.getCollectedfrom());
+
+ pid = mergeLists(pid, e.getPid());
+
+ if (e.getDateofcollection() != null && compareTrust(this, e) < 0)
+ dateofcollection = e.getDateofcollection();
+
+ if (e.getDateoftransformation() != null && compareTrust(this, e) < 0)
+ dateoftransformation = e.getDateoftransformation();
+
+ extraInfo = mergeLists(extraInfo, e.getExtraInfo());
+
+ if (e.getOaiprovenance() != null && compareTrust(this, e) < 0)
+ oaiprovenance = e.getOaiprovenance();
+
+ }
+
+ protected List mergeLists(final List... lists) {
+
+ return Arrays.stream(lists).filter(Objects::nonNull).flatMap(List::stream).distinct().collect(Collectors.toList());
+ }
+
+
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Organization.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Organization.java
index 6f89eca7e..b0dffb485 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Organization.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Organization.java
@@ -164,4 +164,28 @@ public class Organization extends OafEntity implements Serializable {
public void setCountry(Qualifier country) {
this.country = country;
}
+
+
+ @Override
+ public void mergeFrom(OafEntity e) {
+ super.mergeFrom(e);
+ final Organization o = (Organization) e;
+ legalshortname = o.getLegalshortname() != null && compareTrust(this, e)<0? o.getLegalshortname() : legalshortname;
+ legalname = o.getLegalname() != null && compareTrust(this, e)<0 ? o.getLegalname() : legalname;
+ alternativeNames = mergeLists(o.getAlternativeNames(), alternativeNames);
+ websiteurl = o.getWebsiteurl() != null && compareTrust(this, e)<0? o.getWebsiteurl() : websiteurl;
+ logourl = o.getLogourl() != null && compareTrust(this, e)<0? o.getLogourl() : logourl;
+ eclegalbody = o.getEclegalbody() != null && compareTrust(this, e)<0? o.getEclegalbody() : eclegalbody;
+ eclegalperson = o.getEclegalperson() != null && compareTrust(this, e)<0? o.getEclegalperson() : eclegalperson;
+ ecnonprofit = o.getEcnonprofit() != null && compareTrust(this, e)<0? o.getEcnonprofit() : ecnonprofit;
+ ecresearchorganization = o.getEcresearchorganization() != null && compareTrust(this, e)<0? o.getEcresearchorganization() : ecresearchorganization;
+ echighereducation = o.getEchighereducation() != null && compareTrust(this, e)<0? o.getEchighereducation() : echighereducation;
+ ecinternationalorganizationeurinterests = o.getEcinternationalorganizationeurinterests() != null && compareTrust(this, e)<0? o.getEcinternationalorganizationeurinterests() : ecinternationalorganizationeurinterests;
+ ecinternationalorganization = o.getEcinternationalorganization() != null && compareTrust(this, e)<0? o.getEcinternationalorganization() : ecinternationalorganization;
+ ecenterprise = o.getEcenterprise() != null && compareTrust(this, e)<0? o.getEcenterprise() :ecenterprise;
+ ecsmevalidated = o.getEcsmevalidated() != null && compareTrust(this, e)<0? o.getEcsmevalidated() :ecsmevalidated;
+ ecnutscode = o.getEcnutscode() != null && compareTrust(this, e)<0? o.getEcnutscode() :ecnutscode;
+ country = o.getCountry() != null && compareTrust(this, e)<0 ? o.getCountry() :country;
+ mergeOAFDataInfo(o);
+ }
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OtherResearchProduct.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OtherResearchProduct.java
index 5f32b8895..72bec727e 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OtherResearchProduct.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OtherResearchProduct.java
@@ -34,4 +34,16 @@ public class OtherResearchProduct extends Result implements Serializable {
public void setTool(List> tool) {
this.tool = tool;
}
+
+ @Override
+ public void mergeFrom(OafEntity e) {
+ super.mergeFrom(e);
+
+ OtherResearchProduct o = (OtherResearchProduct)e;
+
+ contactperson = mergeLists(contactperson, o.getContactperson());
+ contactgroup = mergeLists(contactgroup, o.getContactgroup());
+ tool = mergeLists(tool, o.getTool());
+ mergeOAFDataInfo(e);
+ }
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java
index 65f22da37..0bc11bb41 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java
@@ -264,4 +264,39 @@ public class Project extends OafEntity implements Serializable {
public void setFundedamount(Float fundedamount) {
this.fundedamount = fundedamount;
}
+
+
+ @Override
+ public void mergeFrom(OafEntity e) {
+ super.mergeFrom(e);
+ Project p = (Project)e;
+
+ websiteurl= p.getWebsiteurl()!= null && compareTrust(this,e)<0?p.getWebsiteurl():websiteurl;
+ code= p.getCode()!=null && compareTrust(this,e)<0?p.getCode():code;
+ acronym= p.getAcronym()!= null && compareTrust(this,e)<0?p.getAcronym():acronym;
+ title= p.getTitle()!= null && compareTrust(this,e)<0?p.getTitle():title;
+ startdate= p.getStartdate()!=null && compareTrust(this,e)<0?p.getStartdate():startdate;
+ enddate= p.getEnddate()!=null && compareTrust(this,e)<0?p.getEnddate():enddate;
+ callidentifier= p.getCallidentifier()!=null && compareTrust(this,e)<0?p.getCallidentifier():callidentifier;
+ keywords= p.getKeywords()!=null && compareTrust(this,e)<0?p.getKeywords():keywords;
+ duration= p.getDuration()!=null && compareTrust(this,e)<0?p.getDuration():duration;
+ ecsc39= p.getEcsc39()!=null && compareTrust(this,e)<0?p.getEcsc39():ecsc39;
+ oamandatepublications= p.getOamandatepublications()!=null && compareTrust(this,e)<0?p.getOamandatepublications():oamandatepublications;
+ ecarticle29_3= p.getEcarticle29_3()!=null && compareTrust(this,e)<0?p.getEcarticle29_3():ecarticle29_3;
+ subjects= mergeLists(subjects, p.getSubjects());
+ fundingtree= mergeLists(fundingtree, p.getFundingtree());
+ contracttype= p.getContracttype()!=null && compareTrust(this,e)<0?p.getContracttype():contracttype;
+ optional1= p.getOptional1()!=null && compareTrust(this,e)<0?p.getOptional1():optional1;
+ optional2= p.getOptional2()!=null && compareTrust(this,e)<0?p.getOptional2():optional2;
+ jsonextrainfo= p.getJsonextrainfo()!=null && compareTrust(this,e)<0?p.getJsonextrainfo():jsonextrainfo;
+ contactfullname= p.getContactfullname()!=null && compareTrust(this,e)<0?p.getContactfullname():contactfullname;
+ contactfax= p.getContactfax()!=null && compareTrust(this,e)<0?p.getContactfax():contactfax;
+ contactphone= p.getContactphone()!=null && compareTrust(this,e)<0?p.getContactphone():contactphone;
+ contactemail= p.getContactemail()!=null && compareTrust(this,e)<0?p.getContactemail():contactemail;
+ summary= p.getSummary()!=null && compareTrust(this,e)<0?p.getSummary():summary;
+ currency= p.getCurrency()!=null && compareTrust(this,e)<0?p.getCurrency():currency;
+ totalcost= p.getTotalcost()!=null && compareTrust(this,e)<0?p.getTotalcost():totalcost;
+ fundedamount= p.getFundedamount()!= null && compareTrust(this,e)<0?p.getFundedamount():fundedamount;
+ mergeOAFDataInfo(e);
+ }
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Publication.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Publication.java
index 9ca9cd3d6..bb6990c1d 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Publication.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Publication.java
@@ -14,4 +14,17 @@ public class Publication extends Result implements Serializable {
public void setJournal(Journal journal) {
this.journal = journal;
}
+
+ @Override
+ public void mergeFrom(OafEntity e) {
+ super.mergeFrom(e);
+
+ Publication p = (Publication) e;
+
+ if (p.getJournal() != null && compareTrust(this, e)<0)
+ journal = p.getJournal();
+ mergeOAFDataInfo(e);
+ }
+
+
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Qualifier.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Qualifier.java
index 60889535d..7e4660f4b 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Qualifier.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Qualifier.java
@@ -1,5 +1,7 @@
package eu.dnetlib.dhp.schema.oaf;
+import org.apache.commons.lang3.StringUtils;
+
import java.io.Serializable;
public class Qualifier implements Serializable {
@@ -40,4 +42,37 @@ public class Qualifier implements Serializable {
public void setSchemename(String schemename) {
this.schemename = schemename;
}
+
+ public String toComparableString() {
+ return isBlank()?"": String.format("%s::%s::%s::%s",
+ classid != null ? classid : "",
+ classname != null ? classname : "",
+ schemeid != null ? schemeid : "",
+ schemename != null ? schemename : "");
+ }
+ public boolean isBlank() {
+ return StringUtils.isBlank(classid) &&
+ StringUtils.isBlank(classname) &&
+ StringUtils.isBlank(schemeid) &&
+ StringUtils.isBlank(schemename);
+ }
+ @Override
+ public int hashCode() {
+ return toComparableString().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+
+ Qualifier other = (Qualifier) obj;
+
+ return toComparableString()
+ .equals(other.toComparableString());
+ }
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
index 7b08e71c2..eb5572ce1 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
@@ -1,7 +1,10 @@
package eu.dnetlib.dhp.schema.oaf;
+import org.apache.commons.lang3.StringUtils;
+
import java.io.Serializable;
-import java.util.List;
+import java.util.*;
+import java.util.stream.Collectors;
public abstract class Result extends OafEntity implements Serializable {
@@ -12,35 +15,35 @@ public abstract class Result extends OafEntity implements Serializable {
// common fields
private Qualifier language;
-
+
private List country;
private List subject;
-
+
private List title;
-
+
private List relevantdate;
private List> description;
-
+
private Field dateofacceptance;
-
+
private Field publisher;
-
+
private Field embargoenddate;
-
+
private List> source;
-
+
private List> fulltext; // remove candidate
-
+
private List> format;
-
+
private List> contributor;
-
+
private Qualifier resourcetype;
-
+
private List> coverage;
-
+
private Field refereed; //peer-review status
private List context;
@@ -240,4 +243,76 @@ public abstract class Result extends OafEntity implements Serializable {
this.processingchargecurrency = processingchargecurrency;
return this;
}
+
+ @Override
+ public void mergeFrom(OafEntity e) {
+ super.mergeFrom(e);
+
+ Result r = (Result) e;
+
+ instance = mergeLists(instance, r.getInstance());
+
+ if (r.getResulttype() != null && compareTrust(this, r) < 0)
+ resulttype = r.getResulttype();
+
+ if (r.getLanguage() != null && compareTrust(this, r) < 0)
+ language = r.getLanguage();
+
+ country = mergeLists(country, r.getCountry());
+
+ subject = mergeLists(subject, r.getSubject());
+
+ title = mergeLists(title, r.getTitle());
+
+ relevantdate = mergeLists(relevantdate, r.getRelevantdate());
+
+ description = longestLists(description, r.getDescription());
+
+ if (r.getPublisher() != null && compareTrust(this, r) < 0)
+ publisher = r.getPublisher();
+
+ if (r.getEmbargoenddate() != null && compareTrust(this, r) < 0)
+ embargoenddate = r.getEmbargoenddate();
+
+ source = mergeLists(source, r.getSource());
+
+ fulltext = mergeLists(fulltext, r.getFulltext());
+
+ format = mergeLists(format, r.getFormat());
+
+ contributor = mergeLists(contributor, r.getContributor());
+
+ if (r.getResourcetype() != null)
+ resourcetype = r.getResourcetype();
+
+ coverage = mergeLists(coverage, r.getCoverage());
+
+ if (r.getRefereed() != null && compareTrust(this, r) < 0)
+ refereed = r.getRefereed();
+
+ context = mergeLists(context, r.getContext());
+
+ if (r.getProcessingchargeamount() != null && compareTrust(this, r) < 0)
+ processingchargeamount = r.getProcessingchargeamount();
+
+ if (r.getProcessingchargecurrency() != null && compareTrust(this, r) < 0)
+ processingchargecurrency = r.getProcessingchargecurrency();
+
+ externalReference = mergeLists(externalReference, r.getExternalReference());
+
+ }
+
+
+ private List> longestLists(List> a, List> b) {
+ if (a == null || b == null)
+ return a == null ? b : a;
+ if (a.size() == b.size()) {
+ int msa = a.stream().filter(i -> i.getValue() != null).map(i -> i.getValue().length()).max(Comparator.naturalOrder()).orElse(0);
+ int msb = b.stream().filter(i -> i.getValue() != null).map(i -> i.getValue().length()).max(Comparator.naturalOrder()).orElse(0);
+ return msa > msb ? a : b;
+ }
+ return a.size() > b.size() ? a : b;
+ }
+
+
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java
index e696cdb52..7f835fdf8 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java
@@ -44,4 +44,19 @@ public class Software extends Result implements Serializable {
public void setProgrammingLanguage(Qualifier programmingLanguage) {
this.programmingLanguage = programmingLanguage;
}
+
+ @Override
+ public void mergeFrom(OafEntity e) {
+ super.mergeFrom(e);
+ final Software s = (Software) e;
+ documentationUrl = mergeLists(documentationUrl, s.getDocumentationUrl());
+
+ license = mergeLists(license, s.getLicense());
+
+ codeRepositoryUrl = s.getCodeRepositoryUrl()!= null && compareTrust(this, s)<0?s.getCodeRepositoryUrl():codeRepositoryUrl;
+
+ programmingLanguage= s.getProgrammingLanguage()!= null && compareTrust(this, s)<0?s.getProgrammingLanguage():programmingLanguage;
+
+ mergeOAFDataInfo(e);
+ }
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/StructuredProperty.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/StructuredProperty.java
index 79ebdd7f9..f6c6b7335 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/StructuredProperty.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/StructuredProperty.java
@@ -33,4 +33,28 @@ public class StructuredProperty implements Serializable {
public void setDataInfo(DataInfo dataInfo) {
this.dataInfo = dataInfo;
}
+
+ public String toComparableString(){
+ return value != null ? value.toLowerCase() : "";
+ }
+
+ @Override
+ public int hashCode() {
+ return toComparableString().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+
+ StructuredProperty other = (StructuredProperty) obj;
+
+ return toComparableString()
+ .equals(other.toComparableString());
+ }
}
diff --git a/dhp-schemas/src/test/java/eu/dnetlib/dhp/schema/oaf/MergeTest.java b/dhp-schemas/src/test/java/eu/dnetlib/dhp/schema/oaf/MergeTest.java
new file mode 100644
index 000000000..e487ddcba
--- /dev/null
+++ b/dhp-schemas/src/test/java/eu/dnetlib/dhp/schema/oaf/MergeTest.java
@@ -0,0 +1,89 @@
+package eu.dnetlib.dhp.schema.oaf;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class MergeTest {
+
+ OafEntity oaf;
+
+ @Before
+ public void setUp() {
+ oaf = new Publication();
+ }
+
+ @Test
+ public void mergeListsTest() {
+
+ //string list merge test
+ List a = Arrays.asList("a", "b", "c", "e");
+ List b = Arrays.asList("a", "b", "c", "d");
+ List c = null;
+
+ System.out.println("merge result 1 = " + oaf.mergeLists(a, b));
+
+ System.out.println("merge result 2 = " + oaf.mergeLists(a, c));
+
+ System.out.println("merge result 3 = " + oaf.mergeLists(c, c));
+ }
+
+ @Test
+ public void mergePublicationCollectedFromTest() {
+
+ Publication a = new Publication();
+ Publication b = new Publication();
+
+ a.setCollectedfrom(Arrays.asList(setKV("a", "open"), setKV("b", "closed")));
+ b.setCollectedfrom(Arrays.asList(setKV("A", "open"), setKV("b", "Open")));
+
+ a.mergeFrom(b);
+
+ Assert.assertNotNull(a.getCollectedfrom());
+ Assert.assertEquals(3, a.getCollectedfrom().size());
+
+ }
+
+ @Test
+ public void mergePublicationSubjectTest() {
+
+ Publication a = new Publication();
+ Publication b = new Publication();
+
+ a.setSubject(Arrays.asList(setSP("a", "open", "classe"), setSP("b", "open", "classe")));
+ b.setSubject(Arrays.asList(setSP("A", "open", "classe"), setSP("c", "open", "classe")));
+
+ a.mergeFrom(b);
+
+ Assert.assertNotNull(a.getSubject());
+ Assert.assertEquals(3, a.getSubject().size());
+
+ }
+
+ private KeyValue setKV(final String key, final String value) {
+
+ KeyValue k = new KeyValue();
+
+ k.setKey(key);
+ k.setValue(value);
+
+ return k;
+ }
+
+ private StructuredProperty setSP(final String value, final String schema, final String classname) {
+ StructuredProperty s = new StructuredProperty();
+ s.setValue(value);
+ Qualifier q = new Qualifier();
+ q.setClassname(classname);
+ q.setClassid(classname);
+ q.setSchemename(schema);
+ q.setSchemeid(schema);
+ s.setQualifier(q);
+ return s;
+ }
+}
diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml
index 10d4ff79a..328e783c4 100644
--- a/dhp-workflows/dhp-aggregation/pom.xml
+++ b/dhp-workflows/dhp-aggregation/pom.xml
@@ -7,6 +7,8 @@
1.0.5-SNAPSHOT
dhp-aggregation
+
+
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java
index d517cca00..5e5e42f1e 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java
@@ -89,6 +89,8 @@ public class TransformationJobTest {
"-rh", "",
"-ro", "",
"-rr", ""});
+
+
}
@Test
@@ -96,7 +98,7 @@ public class TransformationJobTest {
final String path = this.getClass().getResource("/eu/dnetlib/dhp/transform/mdstorenative").getFile();
System.out.println("path = " + path);
- Path tempDirWithPrefix = Files.createTempDirectory("mdsotre_output");
+ Path tempDirWithPrefix = Files.createTempDirectory("mdstore_output");
System.out.println(tempDirWithPrefix.toFile().getAbsolutePath());
diff --git a/dhp-workflows/dhp-dedup/pom.xml b/dhp-workflows/dhp-dedup/pom.xml
new file mode 100644
index 000000000..28ef6a453
--- /dev/null
+++ b/dhp-workflows/dhp-dedup/pom.xml
@@ -0,0 +1,61 @@
+
+
+
+ dhp-workflows
+ eu.dnetlib.dhp
+ 1.0.5-SNAPSHOT
+
+ 4.0.0
+
+ dhp-dedup
+
+
+
+
+ org.apache.spark
+ spark-core_2.11
+
+
+ org.apache.spark
+ spark-sql_2.11
+
+
+
+ eu.dnetlib.dhp
+ dhp-common
+ ${project.version}
+
+
+ eu.dnetlib.dhp
+ dhp-schemas
+ ${project.version}
+
+
+ com.arakelian
+ java-jq
+
+
+
+ eu.dnetlib
+ dnet-pace-core
+
+
+ org.apache.spark
+ spark-graphx_2.11
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DatePicker.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DatePicker.java
new file mode 100644
index 000000000..73f178edc
--- /dev/null
+++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DatePicker.java
@@ -0,0 +1,119 @@
+package eu.dnetlib.dedup;
+
+import eu.dnetlib.dhp.schema.oaf.Field;
+import org.apache.commons.lang.StringUtils;
+
+import java.time.Year;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.reverseOrder;
+import static java.util.Map.Entry.comparingByValue;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.commons.lang.StringUtils.endsWith;
+import static org.apache.commons.lang.StringUtils.substringBefore;
+
+public class DatePicker {
+
+ private static final String DATE_PATTERN = "\\d{4}-\\d{2}-\\d{2}";
+ private static final String DATE_DEFAULT_SUFFIX = "01-01";
+ private static final int YEAR_LB = 1300;
+ private static final int YEAR_UB = Year.now().getValue() + 5;
+
+ public static Field pick(final Collection dateofacceptance) {
+
+ final Map frequencies = dateofacceptance
+ .parallelStream()
+ .filter(StringUtils::isNotBlank)
+ .collect(
+ Collectors.toConcurrentMap(
+ w -> w, w -> 1, Integer::sum));
+
+ if (frequencies.isEmpty()) {
+ return new Field<>();
+ }
+
+ final Field date = new Field<>();
+ date.setValue(frequencies.keySet().iterator().next());
+
+ // let's sort this map by values first, filtering out invalid dates
+ final Map sorted = frequencies
+ .entrySet()
+ .stream()
+ .filter(d -> StringUtils.isNotBlank(d.getKey()))
+ .filter(d -> d.getKey().matches(DATE_PATTERN))
+ .filter(d -> inRange(d.getKey()))
+ .sorted(reverseOrder(comparingByValue()))
+ .collect(
+ toMap(
+ Map.Entry::getKey,
+ Map.Entry::getValue, (e1, e2) -> e2,
+ LinkedHashMap::new));
+
+ // shortcut
+ if (sorted.size() == 0) {
+ return date;
+ }
+
+ // voting method (1/3 + 1) wins
+ if (sorted.size() >= 3) {
+ final int acceptThreshold = (sorted.size() / 3) + 1;
+ final List accepted = sorted.entrySet().stream()
+ .filter(e -> e.getValue() >= acceptThreshold)
+ .map(e -> e.getKey())
+ .collect(Collectors.toList());
+
+ // cannot find strong majority
+ if (accepted.isEmpty()) {
+ final int max = sorted.values().iterator().next();
+ Optional first = sorted.entrySet().stream()
+ .filter(e -> e.getValue() == max && !endsWith(e.getKey(), DATE_DEFAULT_SUFFIX))
+ .map(Map.Entry::getKey)
+ .findFirst();
+ if (first.isPresent()) {
+ date.setValue(first.get());
+ return date;
+ }
+
+ date.setValue(sorted.keySet().iterator().next());
+ return date;
+ }
+
+ if (accepted.size() == 1) {
+ date.setValue(accepted.get(0));
+ return date;
+ } else {
+ final Optional first = accepted.stream()
+ .filter(d -> !endsWith(d, DATE_DEFAULT_SUFFIX))
+ .findFirst();
+ if (first.isPresent()) {
+ date.setValue(first.get());
+ return date;
+ }
+
+ return date;
+ }
+
+ //1st non YYYY-01-01 is returned
+ } else {
+ if (sorted.size() == 2) {
+ for (Map.Entry e : sorted.entrySet()) {
+ if (!endsWith(e.getKey(), DATE_DEFAULT_SUFFIX)) {
+ date.setValue(e.getKey());
+ return date;
+ }
+ }
+ }
+
+ // none of the dates seems good enough, return the 1st one
+ date.setValue(sorted.keySet().iterator().next());
+ return date;
+ }
+ }
+
+ private static boolean inRange(final String date) {
+ final int year = Integer.parseInt(substringBefore(date, "-"));
+ return year >= YEAR_LB && year <= YEAR_UB;
+ }
+
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java
new file mode 100644
index 000000000..5f81669e9
--- /dev/null
+++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java
@@ -0,0 +1,279 @@
+package eu.dnetlib.dedup;
+
+import com.google.common.collect.Lists;
+import eu.dnetlib.dhp.schema.oaf.*;
+import eu.dnetlib.pace.config.DedupConfig;
+import eu.dnetlib.pace.util.MapDocumentUtil;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SparkSession;
+import org.codehaus.jackson.map.ObjectMapper;
+import scala.Tuple2;
+
+import java.util.Collection;
+import java.util.Random;
+
+import static java.util.stream.Collectors.toMap;
+
+public class DedupRecordFactory {
+
+ public static JavaRDD createDedupRecord(final JavaSparkContext sc, final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final OafEntityType entityType, final DedupConfig dedupConf) {
+ long ts = System.currentTimeMillis();
+ //
+ final JavaPairRDD inputJsonEntities = sc.textFile(entitiesInputPath)
+ .mapToPair((PairFunction) it ->
+ new Tuple2(MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), it), it)
+ );
+
+ //
+
@@ -523,6 +525,7 @@
org.apache.maven.plugins
maven-failsafe-plugin
+ ${maven.failsave.plugin.version}
default-integration-test
diff --git a/pom.xml b/pom.xml
index e385ff6eb..aedf5ebff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,6 +114,12 @@
${dhp.spark.version}
provided
+
+ org.apache.spark
+ spark-graphx_2.11
+ ${dhp.spark.version}
+ provided
+
org.apache.commons
@@ -177,6 +183,7 @@
${dhp.jackson.version}
provided
+
com.fasterxml.jackson.core
jackson-annotations
@@ -190,6 +197,12 @@
provided
+
+ eu.dnetlib
+ dnet-pace-core
+ 4.0.0-SNAPSHOT
+
+
javax.persistence
@@ -203,6 +216,21 @@
amqp-client
5.6.0
+
+ com.jayway.jsonpath
+ json-path
+ 2.4.0
+
+
+ com.arakelian
+ java-jq
+ 0.10.1
+
+
+ edu.cmu
+ secondstring
+ 1.0.0
+
org.apache.oozie
@@ -230,7 +258,7 @@
org.apache.maven.plugins
maven-compiler-plugin
- 3.6.0
+ ${maven.compiler.plugin.version}
1.8
@@ -259,27 +287,6 @@
-
- eu.dnetlib
- protoc-jar-maven-plugin
- 1.1.0
-
-
- generate-sources
-
- run
-
-
- ${google.protobuf.version}
-
- src/main/resources
-
- src/gen/java
-
-
-
-
-
org.apache.maven.plugins
maven-surefire-plugin
@@ -342,6 +349,31 @@
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 4.0.1
+
+
+ scala-compile-first
+ initialize
+
+ add-source
+ compile
+
+
+
+ scala-test-compile
+ process-test-resources
+
+ testCompile
+
+
+
+
+ ${scala.version}
+
+
@@ -380,14 +412,15 @@
UTF-8
UTF-8
-
+ 3.6.0
+ 2.22.2
cdh5.9.2
2.6.0-${dhp.cdh.version}
4.1.0-${dhp.cdh.version}
2.4.0.cloudera2
2.9.6
3.5
- 2.11.8
+ 2.11.12