diff --git a/dhp-build/dhp-build-assembly-resources/pom.xml b/dhp-build/dhp-build-assembly-resources/pom.xml
index 8bae191d38..2a89a26fd3 100644
--- a/dhp-build/dhp-build-assembly-resources/pom.xml
+++ b/dhp-build/dhp-build-assembly-resources/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp-build
- 1.2.2-SNAPSHOT
+ 1.2.3-SNAPSHOT
dhp-build-assembly-resources
diff --git a/dhp-build/dhp-build-properties-maven-plugin/pom.xml b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
index ad8cd57b4a..5be114e3c6 100644
--- a/dhp-build/dhp-build-properties-maven-plugin/pom.xml
+++ b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp-build
- 1.2.2-SNAPSHOT
+ 1.2.3-SNAPSHOT
dhp-build-properties-maven-plugin
diff --git a/dhp-build/dhp-code-style/pom.xml b/dhp-build/dhp-code-style/pom.xml
index 08f5de9ee8..515ed35cee 100644
--- a/dhp-build/dhp-code-style/pom.xml
+++ b/dhp-build/dhp-code-style/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp-code-style
- 1.2.2-SNAPSHOT
+ 1.2.3-SNAPSHOT
jar
diff --git a/dhp-build/pom.xml b/dhp-build/pom.xml
index 369e25b24b..d2dcbc36e6 100644
--- a/dhp-build/pom.xml
+++ b/dhp-build/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp
- 1.2.2-SNAPSHOT
+ 1.2.3-SNAPSHOT
dhp-build
pom
diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index 60e66f45a2..0e7652dd3e 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp
- 1.2.2-SNAPSHOT
+ 1.2.3-SNAPSHOT
../
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java
index e793e3f295..c6c9d80447 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java
@@ -2,6 +2,7 @@
package eu.dnetlib.dhp.common;
import java.io.Serializable;
+import java.util.function.Consumer;
import java.util.function.Supplier;
/** Provides serializable and throwing extensions to standard functional interfaces. */
@@ -10,6 +11,16 @@ public class FunctionalInterfaceSupport {
private FunctionalInterfaceSupport() {
}
+ /**
+ * Serializable consumer of any kind of objects. To be used withing spark processing pipelines when supplying
+ * functions externally.
+ *
+ * @param
+ */
+ @FunctionalInterface
+ public interface SerializableConsumer extends Consumer, Serializable {
+ }
+
/**
* Serializable supplier of any kind of objects. To be used withing spark processing pipelines when supplying
* functions externally.
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java
index 1909ddcca6..6e02ca614b 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java
@@ -16,6 +16,12 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
+/**
+ * PacePerson tries to derive information from the fullname string of an author. Such informations are Names, Surnames
+ * an Fullname split into terms. It provides also an additional field for the original data. The calculation of the
+ * names and the surnames is not always possible. When it is impossible to assert which are the names and the surnames,
+ * the lists are empty.
+ */
public class PacePerson {
private static final String UTF8 = "UTF-8";
@@ -26,10 +32,19 @@ public class PacePerson {
private static Set particles = null;
+ /**
+ * Capitalizes a string
+ *
+ * @param s the string to capitalize
+ * @return the input string with capital letter
+ */
public static final String capitalize(final String s) {
return WordUtils.capitalize(s.toLowerCase(), ' ', '-');
}
+ /**
+ * Adds a dot to a string with length equals to 1
+ */
public static final String dotAbbreviations(final String s) {
return s.length() == 1 ? s + "." : s;
}
@@ -46,6 +61,12 @@ public class PacePerson {
return h;
}
+ /**
+ * The constructor of the class. It fills the fields of the class basing on the input fullname.
+ *
+ * @param s the input string (fullname of the author)
+ * @param aggressive set the string normalization type
+ */
public PacePerson(String s, final boolean aggressive) {
original = s;
s = Normalizer.normalize(s, Normalizer.Form.NFD);
@@ -64,6 +85,7 @@ public class PacePerson {
// s = s.replaceAll("[\\W&&[^,-]]", "");
}
+ // if the string contains a comma, it can derive surname and name by splitting on it
if (s.contains(",")) {
final String[] arr = s.split(",");
if (arr.length == 1) {
@@ -74,21 +96,23 @@ public class PacePerson {
fullname.addAll(surname);
fullname.addAll(name);
}
- } else {
+ } else { // otherwise, it should rely on CAPS terms and short terms
fullname = splitTerms(s);
int lastInitialPosition = fullname.size();
boolean hasSurnameInUpperCase = false;
+ // computes lastInitialPosition and hasSurnameInUpperCase
for (int i = 0; i < fullname.size(); i++) {
final String term = fullname.get(i);
if (term.length() == 1) {
- lastInitialPosition = i;
+ lastInitialPosition = i; // first word in the name longer than 1 (to avoid name with dots)
} else if (term.equals(term.toUpperCase())) {
- hasSurnameInUpperCase = true;
+ hasSurnameInUpperCase = true; // if one of the words is CAPS
}
}
+ // manages particular cases of fullnames
if (lastInitialPosition < fullname.size() - 1) { // Case: Michele G. Artini
name = fullname.subList(0, lastInitialPosition + 1);
surname = fullname.subList(lastInitialPosition + 1, fullname.size());
diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/common/PacePersonTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/common/PacePersonTest.java
new file mode 100644
index 0000000000..5ebd7213e8
--- /dev/null
+++ b/dhp-common/src/test/java/eu/dnetlib/dhp/common/PacePersonTest.java
@@ -0,0 +1,27 @@
+
+package eu.dnetlib.dhp.common;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.junit.jupiter.api.Test;
+
+public class PacePersonTest {
+
+ @Test
+ public void pacePersonTest1() {
+
+ PacePerson p = new PacePerson("Artini, Michele", false);
+ assertEquals("Artini", p.getSurnameString());
+ assertEquals("Michele", p.getNameString());
+ assertEquals("Artini, Michele", p.getNormalisedFullname());
+ }
+
+ @Test
+ public void pacePersonTest2() {
+ PacePerson p = new PacePerson("Michele G. Artini", false);
+ assertEquals("Artini, Michele G.", p.getNormalisedFullname());
+ assertEquals("Michele G", p.getNameString());
+ assertEquals("Artini", p.getSurnameString());
+ }
+
+}
diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml
index 5e864cf940..56fb8ead2a 100644
--- a/dhp-schemas/pom.xml
+++ b/dhp-schemas/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp
- 1.2.2-SNAPSHOT
+ 1.2.3-SNAPSHOT
../
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
index e32dd10fa9..a3c1610db9 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
@@ -14,6 +14,7 @@ public class ModelConstants {
public static final String DNET_DATA_CITE_RESOURCE = "dnet:dataCite_resource";
public static final String DNET_PROVENANCE_ACTIONS = "dnet:provenanceActions";
public static final String DNET_COUNTRY_TYPE = "dnet:countries";
+ public static final String DNET_REVIEW_LEVELS = "dnet:review_levels";
public static final String SYSIMPORT_CROSSWALK_REPOSITORY = "sysimport:crosswalk:repository";
public static final String SYSIMPORT_CROSSWALK_ENTITYREGISTRY = "sysimport:crosswalk:entityregistry";
@@ -25,6 +26,10 @@ public class ModelConstants {
public static final String ORP_RESULTTYPE_CLASSID = "other";
public static final String RESULT_RESULT = "resultResult";
+ /**
+ * @deprecated Use {@link ModelConstants#RELATIONSHIP} instead.
+ */
+ @Deprecated
public static final String PUBLICATION_DATASET = "publicationDataset";
public static final String IS_RELATED_TO = "isRelatedTo";
public static final String SUPPLEMENT = "supplement";
@@ -34,6 +39,12 @@ public class ModelConstants {
public static final String IS_PART_OF = "IsPartOf";
public static final String HAS_PARTS = "HasParts";
public static final String RELATIONSHIP = "relationship";
+ public static final String CITATION = "citation";
+ public static final String CITES = "cites";
+ public static final String IS_CITED_BY = "IsCitedBy";
+ public static final String REVIEW = "review";
+ public static final String REVIEWS = "reviews";
+ public static final String IS_REVIEWED_BY = "IsReviewedBy";
public static final String RESULT_PROJECT = "resultProject";
public static final String OUTCOME = "outcome";
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java
index 9ee7c2debc..7d8be81ac9 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java
@@ -58,6 +58,18 @@ public class ModelSupport {
oafTypes.put("relation", Relation.class);
}
+ public static final Map idPrefixMap = Maps.newHashMap();
+
+ static {
+ idPrefixMap.put(Datasource.class, "10");
+ idPrefixMap.put(Organization.class, "20");
+ idPrefixMap.put(Project.class, "40");
+ idPrefixMap.put(Dataset.class, "50");
+ idPrefixMap.put(OtherResearchProduct.class, "50");
+ idPrefixMap.put(Software.class, "50");
+ idPrefixMap.put(Publication.class, "50");
+ }
+
public static final Map entityIdPrefix = Maps.newHashMap();
static {
@@ -289,6 +301,10 @@ public class ModelSupport {
private ModelSupport() {
}
+ public static String getIdPrefix(Class clazz) {
+ return idPrefixMap.get(clazz);
+ }
+
/**
* Checks subclass-superclass relationship.
*
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java
index 07ddbb00e9..b5587c6b7e 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java
@@ -10,6 +10,7 @@ public class Dataset extends Result implements Serializable {
private Field storagedate;
+ // candidate for removal
private Field device;
private Field size;
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java
index 2b7d3846c0..29d4952619 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java
@@ -31,7 +31,7 @@ public class Instance implements Serializable {
// typed results
private Field processingchargecurrency;
- private Field refereed; // peer-review status
+ private Qualifier refereed; // peer-review status
public Field getLicense() {
return license;
@@ -113,11 +113,11 @@ public class Instance implements Serializable {
this.processingchargecurrency = processingchargecurrency;
}
- public Field getRefereed() {
+ public Qualifier getRefereed() {
return refereed;
}
- public void setRefereed(Field refereed) {
+ public void setRefereed(Qualifier refereed) {
this.refereed = refereed;
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
index 9d723f9984..44737d46eb 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
@@ -2,8 +2,10 @@
package eu.dnetlib.dhp.schema.oaf;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.stream.Collectors;
public class Result extends OafEntity implements Serializable {
@@ -260,21 +262,29 @@ public class Result extends OafEntity implements Serializable {
StructuredProperty baseMainTitle = null;
if (title != null) {
baseMainTitle = getMainTitle(title);
- title.remove(baseMainTitle);
+ if (baseMainTitle != null) {
+ final StructuredProperty p = baseMainTitle;
+ title = title.stream().filter(t -> t != p).collect(Collectors.toList());
+ }
}
StructuredProperty newMainTitle = null;
if (r.getTitle() != null) {
newMainTitle = getMainTitle(r.getTitle());
- r.getTitle().remove(newMainTitle);
+ if (newMainTitle != null && title != null) {
+ final StructuredProperty p = newMainTitle;
+ title = title.stream().filter(t -> t != p).collect(Collectors.toList());
+ }
}
- if (newMainTitle != null && compareTrust(this, r) < 0)
+ if (newMainTitle != null && compareTrust(this, r) < 0) {
baseMainTitle = newMainTitle;
+ }
title = mergeLists(title, r.getTitle());
- if (title != null && baseMainTitle != null)
+ if (title != null && baseMainTitle != null) {
title.add(baseMainTitle);
+ }
relevantdate = mergeLists(relevantdate, r.getRelevantdate());
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java
index 40332bf53c..d25b5c9cec 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java
@@ -10,8 +10,10 @@ public class Software extends Result implements Serializable {
private List> documentationUrl;
+ // candidate for removal
private List license;
+ // candidate for removal
private Field codeRepositoryUrl;
private Qualifier programmingLanguage;
diff --git a/dhp-workflows/dhp-actionmanager/pom.xml b/dhp-workflows/dhp-actionmanager/pom.xml
index ec6247102e..b50c6705b2 100644
--- a/dhp-workflows/dhp-actionmanager/pom.xml
+++ b/dhp-workflows/dhp-actionmanager/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp-workflows
- 1.2.2-SNAPSHOT
+ 1.2.3-SNAPSHOT
dhp-actionmanager
diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java
index e55c0eb7b6..8ea877aec5 100644
--- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java
+++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java
@@ -96,12 +96,21 @@ public class ProtoConverter implements Serializable {
.stream()
.distinct()
.collect(Collectors.toCollection(ArrayList::new)) : null);
- i.setRefereed(mapStringField(ri.getRefereed()));
+ i.setRefereed(mapRefereed(ri.getRefereed()));
i.setProcessingchargeamount(mapStringField(ri.getProcessingchargeamount()));
i.setProcessingchargecurrency(mapStringField(ri.getProcessingchargecurrency()));
return i;
}
+ private static Qualifier mapRefereed(FieldTypeProtos.StringField refereed) {
+ Qualifier q = new Qualifier();
+ q.setClassid(refereed.getValue());
+ q.setSchemename(refereed.getValue());
+ q.setSchemeid("dnet:review_levels");
+ q.setSchemename("dnet:review_levels");
+ return q;
+ }
+
private static List convertExternalRefs(OafProtos.Oaf oaf) {
ResultProtos.Result r = oaf.getEntity().getResult();
if (r.getExternalReferenceCount() > 0) {
diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml
index 282ca476d5..c04910a583 100644
--- a/dhp-workflows/dhp-aggregation/pom.xml
+++ b/dhp-workflows/dhp-aggregation/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp-workflows
- 1.2.2-SNAPSHOT
+ 1.2.3-SNAPSHOT
dhp-aggregation
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java
index 87bd3be3d8..c745219fea 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java
@@ -8,6 +8,7 @@ import java.io.File;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -19,6 +20,7 @@ import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager;
+@Disabled
public class DnetCollectorWorkerApplicationTests {
private final ArgumentApplicationParser argumentParser = mock(ArgumentApplicationParser.class);
diff --git a/dhp-workflows/dhp-blacklist/pom.xml b/dhp-workflows/dhp-blacklist/pom.xml
index a3cc15b744..04d334cd75 100644
--- a/dhp-workflows/dhp-blacklist/pom.xml
+++ b/dhp-workflows/dhp-blacklist/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.2-SNAPSHOT
+ 1.2.3-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml
index 0e84d99a45..eddc042c6b 100644
--- a/dhp-workflows/dhp-broker-events/pom.xml
+++ b/dhp-workflows/dhp-broker-events/pom.xml
@@ -1,66 +1,70 @@
-
- dhp-workflows
- eu.dnetlib.dhp
- 1.2.2-SNAPSHOT
-
- 4.0.0
+
+ dhp-workflows
+ eu.dnetlib.dhp
+ 1.2.3-SNAPSHOT
+
+ 4.0.0
- dhp-broker-events
+ dhp-broker-events
-
+
-
- commons-io
- commons-io
-
+
+ commons-io
+ commons-io
+
-
- org.apache.spark
- spark-core_2.11
-
-
- org.apache.spark
- spark-sql_2.11
-
-
- org.apache.spark
- spark-hive_2.11
- test
-
+
+ org.apache.spark
+ spark-core_2.11
+
+
+ org.apache.spark
+ spark-sql_2.11
+
+
+ org.apache.spark
+ spark-hive_2.11
+ test
+
-
- eu.dnetlib.dhp
- dhp-common
- ${project.version}
-
-
- eu.dnetlib.dhp
- dhp-schemas
- ${project.version}
-
+
+ eu.dnetlib.dhp
+ dhp-common
+ ${project.version}
+
+
+ eu.dnetlib.dhp
+ dhp-schemas
+ ${project.version}
+
+
+ eu.dnetlib
+ dnet-pace-core
+
-
- com.jayway.jsonpath
- json-path
-
-
- dom4j
- dom4j
-
-
- jaxen
- jaxen
-
+
+ com.jayway.jsonpath
+ json-path
+
+
+ dom4j
+ dom4j
+
+
+ jaxen
+ jaxen
+
-
- eu.dnetlib
- dnet-openaire-broker-common
- [2.0.0,3.0.0)
-
+
+ eu.dnetlib
+ dnet-openaire-broker-common
+ [3.0.1,4.0.0)
+
-
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-broker-events/report.xml b/dhp-workflows/dhp-broker-events/report.xml
new file mode 100644
index 0000000000..6e706f7230
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/report.xml
@@ -0,0 +1,37 @@
+
+
+
+ Feature Extraction
+
+
+ TCPFLOW
+ 1.5.0
+
+ 4.2.1 (4.2.1 Compatible Apple LLVM 11.0.0 (clang-1100.0.33.8))
+ -D_THREAD_SAFE -pthread -I/usr/local/include -I/usr/local/include -DUTC_OFFSET=+0000
+ -g -D_THREAD_SAFE -pthread -g -O3 -MD -Wpointer-arith -Wmissing-declarations -Wmissing-prototypes -Wshadow -Wwrite-strings -Wcast-align -Waggregate-return -Wbad-function-cast -Wcast-qual -Wundef -Wredundant-decls -Wdisabled-optimization -Wfloat-equal -Wmultichar -Wc++-compat -Wmissing-noreturn -Wall -Wstrict-prototypes -MD -D_FORTIFY_SOURCE=2 -Wpointer-arith -Wmissing-declarations -Wmissing-prototypes -Wshadow -Wwrite-strings -Wcast-align -Waggregate-return -Wbad-function-cast -Wcast-qual -Wundef -Wredundant-decls -Wdisabled-optimization -Wfloat-equal -Wmultichar -Wc++-compat -Wmissing-noreturn -Wall -Wstrict-prototypes
+ -g -D_THREAD_SAFE -pthread -g -O3 -Wall -MD -D_FORTIFY_SOURCE=2 -Wpointer-arith -Wshadow -Wwrite-strings -Wcast-align -Wredundant-decls -Wdisabled-optimization -Wfloat-equal -Wmultichar -Wmissing-noreturn -Woverloaded-virtual -Wsign-promo -funit-at-a-time -Weffc++ -std=c++11 -Wall -MD -D_FORTIFY_SOURCE=2 -Wpointer-arith -Wshadow -Wwrite-strings -Wcast-align -Wredundant-decls -Wdisabled-optimization -Wfloat-equal -Wmultichar -Wmissing-noreturn -Woverloaded-virtual -Wsign-promo -funit-at-a-time -Weffc++
+ -L/usr/local/lib -L/usr/local/lib
+ -lpython2.7 -lpython2.7 -lpcap -lbz2 -lexpat -lsqlite3 -lcrypto -lssl -lcrypto -ldl -lz
+ 2019-10-11T01:16:58
+
+
+
+
+ Darwin
+ 19.5.0
+ Darwin Kernel Version 19.5.0: Tue May 26 20:41:44 PDT 2020; root:xnu-6153.121.2~2/RELEASE_X86_64
+ Micheles-MBP.local
+ x86_64
+ tcpflow
+ 501
+ michele
+ 2020-06-15T14:55:03Z
+
+
+
+
+ 0
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java
index 0512a38134..f94d286e4f 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java
@@ -1,9 +1,15 @@
package eu.dnetlib.dhp.broker.model;
+import java.io.Serializable;
import java.util.Map;
-public class Event {
+public class Event implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -5936790326505781395L;
private String eventId;
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
index 9e5d986448..bf4f62d243 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
@@ -6,18 +6,13 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
-import eu.dnetlib.broker.objects.OpenAireEventPayload;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
-import eu.dnetlib.dhp.schema.oaf.Author;
-import eu.dnetlib.dhp.schema.oaf.KeyValue;
-import eu.dnetlib.dhp.schema.oaf.Result;
-import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class EventFactory {
@@ -37,15 +32,12 @@ public class EventFactory {
final Map map = createMapFromResult(updateInfo);
- final String payload = createPayload(updateInfo);
-
final String eventId = calculateEventId(
- updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0),
- updateInfo.getHighlightValueAsString());
+ updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId(), updateInfo.getHighlightValueAsString());
res.setEventId(eventId);
res.setProducerId(PRODUCER_ID);
- res.setPayload(payload);
+ res.setPayload(updateInfo.asBrokerPayload().toJSON());
res.setMap(map);
res.setTopic(updateInfo.getTopicPath());
res.setCreationDate(now);
@@ -54,65 +46,34 @@ public class EventFactory {
return res;
}
- private static String createPayload(final UpdateInfo> updateInfo) {
- final OpenAireEventPayload payload = new OpenAireEventPayload();
- // TODO
-
- updateInfo.compileHighlight(payload);
-
- return payload.toJSON();
- }
-
private static Map createMapFromResult(final UpdateInfo> updateInfo) {
final Map map = new HashMap<>();
- final Result source = updateInfo.getSource();
- final Result target = updateInfo.getTarget();
+ final OpenaireBrokerResult source = updateInfo.getSource();
+ final OpenaireBrokerResult target = updateInfo.getTarget();
- final List collectedFrom = target.getCollectedfrom();
- if (collectedFrom.size() == 1) {
- map.put("target_datasource_id", collectedFrom.get(0).getKey());
- map.put("target_datasource_name", collectedFrom.get(0).getValue());
- }
+ map.put("target_datasource_id", target.getCollectedFromId());
+ map.put("target_datasource_name", target.getCollectedFromName());
- final List ids = target.getOriginalId();
- if (ids.size() > 0) {
- map.put("target_publication_id", ids.get(0));
- }
+ map.put("target_publication_id", target.getOriginalId());
- final List titles = target.getTitle();
+ final List titles = target.getTitles();
if (titles.size() > 0) {
map.put("target_publication_title", titles.get(0));
}
- final long date = parseDateTolong(target.getDateofacceptance().getValue());
+ final long date = parseDateTolong(target.getPublicationdate());
if (date > 0) {
map.put("target_dateofacceptance", date);
}
- final List subjects = target.getSubject();
- if (subjects.size() > 0) {
- map
- .put(
- "target_publication_subject_list",
- subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList()));
- }
-
- final List authors = target.getAuthor();
- if (authors.size() > 0) {
- map
- .put(
- "target_publication_author_list",
- authors.stream().map(Author::getFullname).collect(Collectors.toList()));
- }
+ map.put("target_publication_subject_list", target.getSubjects());
+ map.put("target_publication_author_list", target.getCreators());
// PROVENANCE INFO
map.put("trust", updateInfo.getTrust());
- final List sourceCollectedFrom = source.getCollectedfrom();
- if (sourceCollectedFrom.size() == 1) {
- map.put("provenance_datasource_id", sourceCollectedFrom.get(0).getKey());
- map.put("provenance_datasource_name", sourceCollectedFrom.get(0).getValue());
- }
+ map.put("provenance_datasource_id", source.getCollectedFromId());
+ map.put("provenance_datasource_name", source.getCollectedFromName());
map.put("provenance_publication_id_list", source.getOriginalId());
return map;
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
index d5e577972d..940d7f9f32 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
@@ -3,59 +3,33 @@ package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.TypedColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
-import eu.dnetlib.dhp.broker.model.EventFactory;
-import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsRelatedTo;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedBy;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedTo;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetReferences;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMissingProject;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMoreProject;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsReferencedBy;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsRelatedTo;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedBy;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedTo;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationReferences;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAbstract;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAuthorOrcid;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingOpenAccess;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPid;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSoftware;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSubject;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSoftware;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
-import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
+import eu.dnetlib.dhp.broker.oa.util.EventFinder;
+import eu.dnetlib.dhp.broker.oa.util.EventGroup;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
@@ -63,49 +37,23 @@ import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
+import eu.dnetlib.dhp.utils.ISLookupClientFactory;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import eu.dnetlib.pace.config.DedupConfig;
+import scala.Tuple2;
public class GenerateEventsApplication {
private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class);
- // Simple Matchers
- private static final UpdateMatcher enrichMissingAbstract = new EnrichMissingAbstract();
- private static final UpdateMatcher enrichMissingAuthorOrcid = new EnrichMissingAuthorOrcid();
- private static final UpdateMatcher enrichMissingOpenAccess = new EnrichMissingOpenAccess();
- private static final UpdateMatcher enrichMissingPid = new EnrichMissingPid();
- private static final UpdateMatcher enrichMissingPublicationDate = new EnrichMissingPublicationDate();
- private static final UpdateMatcher enrichMissingSubject = new EnrichMissingSubject();
- private static final UpdateMatcher enrichMoreOpenAccess = new EnrichMoreOpenAccess();
- private static final UpdateMatcher enrichMorePid = new EnrichMorePid();
- private static final UpdateMatcher enrichMoreSubject = new EnrichMoreSubject();
-
- // Advanced matchers
- private static final UpdateMatcher>, ?> enrichMissingProject = new EnrichMissingProject();
- private static final UpdateMatcher>, ?> enrichMoreProject = new EnrichMoreProject();
-
- private static final UpdateMatcher>, ?> enrichMissingSoftware = new EnrichMissingSoftware();
- private static final UpdateMatcher>, ?> enrichMoreSoftware = new EnrichMoreSoftware();
-
- private static final UpdateMatcher>, ?> enrichMisissingPublicationIsRelatedTo = new EnrichMissingPublicationIsRelatedTo();
- private static final UpdateMatcher>, ?> enrichMissingPublicationIsReferencedBy = new EnrichMissingPublicationIsReferencedBy();
- private static final UpdateMatcher>, ?> enrichMissingPublicationReferences = new EnrichMissingPublicationReferences();
- private static final UpdateMatcher>, ?> enrichMissingPublicationIsSupplementedTo = new EnrichMissingPublicationIsSupplementedTo();
- private static final UpdateMatcher>, ?> enrichMissingPublicationIsSupplementedBy = new EnrichMissingPublicationIsSupplementedBy();
-
- private static final UpdateMatcher>, ?> enrichMisissingDatasetIsRelatedTo = new EnrichMissingDatasetIsRelatedTo();
- private static final UpdateMatcher>, ?> enrichMissingDatasetIsReferencedBy = new EnrichMissingDatasetIsReferencedBy();
- private static final UpdateMatcher>, ?> enrichMissingDatasetReferences = new EnrichMissingDatasetReferences();
- private static final UpdateMatcher>, ?> enrichMissingDatasetIsSupplementedTo = new EnrichMissingDatasetIsSupplementedTo();
- private static final UpdateMatcher>, ?> enrichMissingDatasetIsSupplementedBy = new EnrichMissingDatasetIsSupplementedBy();
-
- public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
GenerateEventsApplication.class
- .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json")));
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
@@ -120,20 +68,32 @@ public class GenerateEventsApplication {
final String eventsPath = parser.get("eventsPath");
log.info("eventsPath: {}", eventsPath);
+ final String isLookupUrl = parser.get("isLookupUrl");
+ log.info("isLookupUrl: {}", isLookupUrl);
+
+ final String dedupConfigProfileId = parser.get("dedupConfProfile");
+ log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
+
final SparkConf conf = new SparkConf();
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.registerKryoClasses(BrokerConstants.getModelClasses());
+
+ final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
- final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+
removeOutputDir(spark, eventsPath);
- final JavaRDD eventsRdd = sc.emptyRDD();
-
- eventsRdd.union(generateSimpleEvents(spark, graphPath, Publication.class));
- eventsRdd.union(generateSimpleEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class));
- eventsRdd.union(generateSimpleEvents(spark, graphPath, Software.class));
- eventsRdd.union(generateSimpleEvents(spark, graphPath, OtherResearchProduct.class));
-
- eventsRdd.saveAsTextFile(eventsPath, GzipCodec.class);
+ spark
+ .emptyDataset(Encoders.kryo(Event.class))
+ .union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
+ .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
+ .union(generateEvents(spark, graphPath, Software.class, dedupConfig))
+ .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(eventsPath);
});
}
@@ -142,130 +102,85 @@ public class GenerateEventsApplication {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
- private static JavaRDD generateSimpleEvents(final SparkSession spark,
+ private static Dataset generateEvents(
+ final SparkSession spark,
final String graphPath,
- final Class resultClazz) {
+ final Class sourceClass,
+ final DedupConfig dedupConfig) {
- final Dataset results = readPath(
- spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), resultClazz)
- .filter(r -> r.getDataInfo().getDeletedbyinference());
+ final Dataset results = expandResultsWithRelations(spark, graphPath, sourceClass);
- final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class)
+ final Dataset mergedRels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
- final Column c = null; // TODO
+ final TypedColumn, ResultGroup> aggr = new ResultAggregator()
+ .toColumn();
- final Dataset aa = results
- .joinWith(rels, results.col("id").equalTo(rels.col("source")), "inner")
- .groupBy(rels.col("target"))
- .agg(c)
- .filter(x -> x.size() > 1)
- // generateSimpleEvents(...)
- // flatMap()
- // toRdd()
+ return results
+ .joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner")
+ .groupByKey(
+ (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING())
+ .agg(aggr)
+ .map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class))
+ .filter(ResultGroup::isValid)
+ .map(
+ (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig),
+ Encoders.kryo(EventGroup.class))
+ .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class));
+ }
+
+ private static Dataset expandResultsWithRelations(
+ final SparkSession spark,
+ final String graphPath,
+ final Class sourceClass) {
+ final Dataset projects = readPath(spark, graphPath + "/project", Project.class);
+ final Dataset datasets = readPath(
+ spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
+ final Dataset softwares = readPath(spark, graphPath + "/software", Software.class);
+ final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class);
+
+ final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class)
+ .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
+
+ final Dataset r0 = readPath(
+ spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class)
+ .filter(r -> r.getDataInfo().getDeletedbyinference())
+ .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class));
+
+ final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
+ final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class));
+ final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class));
+ final Dataset r4 = join(
+ r3, rels, relatedEntities(publications, rels, RelatedProject.class));
;
- return null;
-
+ return r4;
}
- private List generateSimpleEvents(final Collection children) {
- final List> list = new ArrayList<>();
-
- for (final Result target : children) {
- list.addAll(enrichMissingAbstract.searchUpdatesForRecord(target, children));
- list.addAll(enrichMissingAuthorOrcid.searchUpdatesForRecord(target, children));
- list.addAll(enrichMissingOpenAccess.searchUpdatesForRecord(target, children));
- list.addAll(enrichMissingPid.searchUpdatesForRecord(target, children));
- list.addAll(enrichMissingPublicationDate.searchUpdatesForRecord(target, children));
- list.addAll(enrichMissingSubject.searchUpdatesForRecord(target, children));
- list.addAll(enrichMoreOpenAccess.searchUpdatesForRecord(target, children));
- list.addAll(enrichMorePid.searchUpdatesForRecord(target, children));
- list.addAll(enrichMoreSubject.searchUpdatesForRecord(target, children));
- }
-
- return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
+ private static Dataset