diff --git a/dhp-build/dhp-build-assembly-resources/pom.xml b/dhp-build/dhp-build-assembly-resources/pom.xml
index 8bae191d3..2a89a26fd 100644
--- a/dhp-build/dhp-build-assembly-resources/pom.xml
+++ b/dhp-build/dhp-build-assembly-resources/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp-build
- 1.2.2-SNAPSHOT
+ 1.2.3-SNAPSHOT
dhp-build-assembly-resources
diff --git a/dhp-build/dhp-build-properties-maven-plugin/pom.xml b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
index ad8cd57b4..5be114e3c 100644
--- a/dhp-build/dhp-build-properties-maven-plugin/pom.xml
+++ b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp-build
- 1.2.2-SNAPSHOT
+ 1.2.3-SNAPSHOT
dhp-build-properties-maven-plugin
diff --git a/dhp-build/dhp-code-style/pom.xml b/dhp-build/dhp-code-style/pom.xml
index 08f5de9ee..515ed35ce 100644
--- a/dhp-build/dhp-code-style/pom.xml
+++ b/dhp-build/dhp-code-style/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp-code-style
- 1.2.2-SNAPSHOT
+ 1.2.3-SNAPSHOT
jar
diff --git a/dhp-build/pom.xml b/dhp-build/pom.xml
index 369e25b24..d2dcbc36e 100644
--- a/dhp-build/pom.xml
+++ b/dhp-build/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp
- 1.2.2-SNAPSHOT
+ 1.2.3-SNAPSHOT
dhp-build
pom
diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index 60e66f45a..0e7652dd3 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp
- 1.2.2-SNAPSHOT
+ 1.2.3-SNAPSHOT
../
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java
index e793e3f29..c6c9d8044 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java
@@ -2,6 +2,7 @@
package eu.dnetlib.dhp.common;
import java.io.Serializable;
+import java.util.function.Consumer;
import java.util.function.Supplier;
/** Provides serializable and throwing extensions to standard functional interfaces. */
@@ -10,6 +11,16 @@ public class FunctionalInterfaceSupport {
private FunctionalInterfaceSupport() {
}
+ /**
+ * Serializable consumer of any kind of objects. To be used withing spark processing pipelines when supplying
+ * functions externally.
+ *
+ * @param
+ */
+ @FunctionalInterface
+ public interface SerializableConsumer extends Consumer, Serializable {
+ }
+
/**
* Serializable supplier of any kind of objects. To be used withing spark processing pipelines when supplying
* functions externally.
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java
index 1909ddcca..6e02ca614 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java
@@ -16,6 +16,12 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
+/**
+ * PacePerson tries to derive information from the fullname string of an author. Such informations are Names, Surnames
+ * an Fullname split into terms. It provides also an additional field for the original data. The calculation of the
+ * names and the surnames is not always possible. When it is impossible to assert which are the names and the surnames,
+ * the lists are empty.
+ */
public class PacePerson {
private static final String UTF8 = "UTF-8";
@@ -26,10 +32,19 @@ public class PacePerson {
private static Set particles = null;
+ /**
+ * Capitalizes a string
+ *
+ * @param s the string to capitalize
+ * @return the input string with capital letter
+ */
public static final String capitalize(final String s) {
return WordUtils.capitalize(s.toLowerCase(), ' ', '-');
}
+ /**
+ * Adds a dot to a string with length equals to 1
+ */
public static final String dotAbbreviations(final String s) {
return s.length() == 1 ? s + "." : s;
}
@@ -46,6 +61,12 @@ public class PacePerson {
return h;
}
+ /**
+ * The constructor of the class. It fills the fields of the class basing on the input fullname.
+ *
+ * @param s the input string (fullname of the author)
+ * @param aggressive set the string normalization type
+ */
public PacePerson(String s, final boolean aggressive) {
original = s;
s = Normalizer.normalize(s, Normalizer.Form.NFD);
@@ -64,6 +85,7 @@ public class PacePerson {
// s = s.replaceAll("[\\W&&[^,-]]", "");
}
+ // if the string contains a comma, it can derive surname and name by splitting on it
if (s.contains(",")) {
final String[] arr = s.split(",");
if (arr.length == 1) {
@@ -74,21 +96,23 @@ public class PacePerson {
fullname.addAll(surname);
fullname.addAll(name);
}
- } else {
+ } else { // otherwise, it should rely on CAPS terms and short terms
fullname = splitTerms(s);
int lastInitialPosition = fullname.size();
boolean hasSurnameInUpperCase = false;
+ // computes lastInitialPosition and hasSurnameInUpperCase
for (int i = 0; i < fullname.size(); i++) {
final String term = fullname.get(i);
if (term.length() == 1) {
- lastInitialPosition = i;
+ lastInitialPosition = i; // first word in the name longer than 1 (to avoid name with dots)
} else if (term.equals(term.toUpperCase())) {
- hasSurnameInUpperCase = true;
+ hasSurnameInUpperCase = true; // if one of the words is CAPS
}
}
+ // manages particular cases of fullnames
if (lastInitialPosition < fullname.size() - 1) { // Case: Michele G. Artini
name = fullname.subList(0, lastInitialPosition + 1);
surname = fullname.subList(lastInitialPosition + 1, fullname.size());
diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/common/PacePersonTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/common/PacePersonTest.java
new file mode 100644
index 000000000..5ebd7213e
--- /dev/null
+++ b/dhp-common/src/test/java/eu/dnetlib/dhp/common/PacePersonTest.java
@@ -0,0 +1,27 @@
+
+package eu.dnetlib.dhp.common;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.junit.jupiter.api.Test;
+
+public class PacePersonTest {
+
+ @Test
+ public void pacePersonTest1() {
+
+ PacePerson p = new PacePerson("Artini, Michele", false);
+ assertEquals("Artini", p.getSurnameString());
+ assertEquals("Michele", p.getNameString());
+ assertEquals("Artini, Michele", p.getNormalisedFullname());
+ }
+
+ @Test
+ public void pacePersonTest2() {
+ PacePerson p = new PacePerson("Michele G. Artini", false);
+ assertEquals("Artini, Michele G.", p.getNormalisedFullname());
+ assertEquals("Michele G", p.getNameString());
+ assertEquals("Artini", p.getSurnameString());
+ }
+
+}
diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml
index 5e864cf94..56fb8ead2 100644
--- a/dhp-schemas/pom.xml
+++ b/dhp-schemas/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp
- 1.2.2-SNAPSHOT
+ 1.2.3-SNAPSHOT
../
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
index e32dd10fa..a3c1610db 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
@@ -14,6 +14,7 @@ public class ModelConstants {
public static final String DNET_DATA_CITE_RESOURCE = "dnet:dataCite_resource";
public static final String DNET_PROVENANCE_ACTIONS = "dnet:provenanceActions";
public static final String DNET_COUNTRY_TYPE = "dnet:countries";
+ public static final String DNET_REVIEW_LEVELS = "dnet:review_levels";
public static final String SYSIMPORT_CROSSWALK_REPOSITORY = "sysimport:crosswalk:repository";
public static final String SYSIMPORT_CROSSWALK_ENTITYREGISTRY = "sysimport:crosswalk:entityregistry";
@@ -25,6 +26,10 @@ public class ModelConstants {
public static final String ORP_RESULTTYPE_CLASSID = "other";
public static final String RESULT_RESULT = "resultResult";
+ /**
+ * @deprecated Use {@link ModelConstants#RELATIONSHIP} instead.
+ */
+ @Deprecated
public static final String PUBLICATION_DATASET = "publicationDataset";
public static final String IS_RELATED_TO = "isRelatedTo";
public static final String SUPPLEMENT = "supplement";
@@ -34,6 +39,12 @@ public class ModelConstants {
public static final String IS_PART_OF = "IsPartOf";
public static final String HAS_PARTS = "HasParts";
public static final String RELATIONSHIP = "relationship";
+ public static final String CITATION = "citation";
+ public static final String CITES = "cites";
+ public static final String IS_CITED_BY = "IsCitedBy";
+ public static final String REVIEW = "review";
+ public static final String REVIEWS = "reviews";
+ public static final String IS_REVIEWED_BY = "IsReviewedBy";
public static final String RESULT_PROJECT = "resultProject";
public static final String OUTCOME = "outcome";
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java
index 2b7d3846c..29d495261 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java
@@ -31,7 +31,7 @@ public class Instance implements Serializable {
// typed results
private Field processingchargecurrency;
- private Field refereed; // peer-review status
+ private Qualifier refereed; // peer-review status
public Field getLicense() {
return license;
@@ -113,11 +113,11 @@ public class Instance implements Serializable {
this.processingchargecurrency = processingchargecurrency;
}
- public Field getRefereed() {
+ public Qualifier getRefereed() {
return refereed;
}
- public void setRefereed(Field refereed) {
+ public void setRefereed(Qualifier refereed) {
this.refereed = refereed;
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
index 09e77a244..c9d0ac7c7 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
@@ -254,28 +254,25 @@ public class Result extends OafEntity implements Serializable {
final StructuredProperty p = baseMainTitle;
title = title.stream().filter(t -> t != p).collect(Collectors.toList());
}
-//
-//
-// title.remove(baseMainTitle);
}
StructuredProperty newMainTitle = null;
if (r.getTitle() != null) {
newMainTitle = getMainTitle(r.getTitle());
- if (newMainTitle != null) {
+ if (newMainTitle != null && title != null) {
final StructuredProperty p = newMainTitle;
title = title.stream().filter(t -> t != p).collect(Collectors.toList());
}
-
- // r.getTitle().remove(newMainTitle);
}
- if (newMainTitle != null && compareTrust(this, r) < 0)
+ if (newMainTitle != null && compareTrust(this, r) < 0) {
baseMainTitle = newMainTitle;
+ }
title = mergeLists(title, r.getTitle());
- if (title != null && baseMainTitle != null)
+ if (title != null && baseMainTitle != null) {
title.add(baseMainTitle);
+ }
relevantdate = mergeLists(relevantdate, r.getRelevantdate());
diff --git a/dhp-workflows/dhp-actionmanager/pom.xml b/dhp-workflows/dhp-actionmanager/pom.xml
index ec6247102..b50c6705b 100644
--- a/dhp-workflows/dhp-actionmanager/pom.xml
+++ b/dhp-workflows/dhp-actionmanager/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp-workflows
- 1.2.2-SNAPSHOT
+ 1.2.3-SNAPSHOT
dhp-actionmanager
diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java
index e55c0eb7b..8ea877aec 100644
--- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java
+++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java
@@ -96,12 +96,21 @@ public class ProtoConverter implements Serializable {
.stream()
.distinct()
.collect(Collectors.toCollection(ArrayList::new)) : null);
- i.setRefereed(mapStringField(ri.getRefereed()));
+ i.setRefereed(mapRefereed(ri.getRefereed()));
i.setProcessingchargeamount(mapStringField(ri.getProcessingchargeamount()));
i.setProcessingchargecurrency(mapStringField(ri.getProcessingchargecurrency()));
return i;
}
+ private static Qualifier mapRefereed(FieldTypeProtos.StringField refereed) {
+ Qualifier q = new Qualifier();
+ q.setClassid(refereed.getValue());
+ q.setSchemename(refereed.getValue());
+ q.setSchemeid("dnet:review_levels");
+ q.setSchemename("dnet:review_levels");
+ return q;
+ }
+
private static List convertExternalRefs(OafProtos.Oaf oaf) {
ResultProtos.Result r = oaf.getEntity().getResult();
if (r.getExternalReferenceCount() > 0) {
diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml
index 9f082df70..fdb4467d6 100644
--- a/dhp-workflows/dhp-aggregation/pom.xml
+++ b/dhp-workflows/dhp-aggregation/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp-workflows
- 1.2.2-SNAPSHOT
+ 1.2.3-SNAPSHOT
dhp-aggregation
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java
index 87bd3be3d..c745219fe 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java
@@ -8,6 +8,7 @@ import java.io.File;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -19,6 +20,7 @@ import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager;
+@Disabled
public class DnetCollectorWorkerApplicationTests {
private final ArgumentApplicationParser argumentParser = mock(ArgumentApplicationParser.class);
diff --git a/dhp-workflows/dhp-blacklist/pom.xml b/dhp-workflows/dhp-blacklist/pom.xml
index a3cc15b74..04d334cd7 100644
--- a/dhp-workflows/dhp-blacklist/pom.xml
+++ b/dhp-workflows/dhp-blacklist/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.2-SNAPSHOT
+ 1.2.3-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml
index 59ff05ed3..f060a60bc 100644
--- a/dhp-workflows/dhp-broker-events/pom.xml
+++ b/dhp-workflows/dhp-broker-events/pom.xml
@@ -1,66 +1,70 @@
-
- dhp-workflows
- eu.dnetlib.dhp
- 1.2.2-SNAPSHOT
-
- 4.0.0
+
+ dhp-workflows
+ eu.dnetlib.dhp
+ 1.2.3-SNAPSHOT
+
+ 4.0.0
- dhp-broker-events
+ dhp-broker-events
-
+
-
- commons-io
- commons-io
-
+
+ commons-io
+ commons-io
+
-
- org.apache.spark
- spark-core_2.11
-
-
- org.apache.spark
- spark-sql_2.11
-
-
- org.apache.spark
- spark-hive_2.11
- test
-
+
+ org.apache.spark
+ spark-core_2.11
+
+
+ org.apache.spark
+ spark-sql_2.11
+
+
+ org.apache.spark
+ spark-hive_2.11
+ test
+
-
- eu.dnetlib.dhp
- dhp-common
- ${project.version}
-
-
- eu.dnetlib.dhp
- dhp-schemas
- ${project.version}
-
+
+ eu.dnetlib.dhp
+ dhp-common
+ ${project.version}
+
+
+ eu.dnetlib.dhp
+ dhp-schemas
+ ${project.version}
+
+
+ eu.dnetlib
+ dnet-pace-core
+
-
- com.jayway.jsonpath
- json-path
-
-
- dom4j
- dom4j
-
-
- jaxen
- jaxen
-
+
+ com.jayway.jsonpath
+ json-path
+
+
+ dom4j
+ dom4j
+
+
+ jaxen
+ jaxen
+
-
- eu.dnetlib
- dnet-openaire-broker-common
- [2.0.1,3.0.0)
-
+
+ eu.dnetlib
+ dnet-openaire-broker-common
+ [2.0.1,3.0.0)
+
-
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
index df33fae0d..9146cf422 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
@@ -37,7 +37,7 @@ public class EventFactory {
final Map map = createMapFromResult(updateInfo);
final String eventId = calculateEventId(
- updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0),
+ updateInfo.getTopicPath(), updateInfo.getTarget().getResult().getOriginalId().get(0),
updateInfo.getHighlightValueAsString());
res.setEventId(eventId);
@@ -54,8 +54,8 @@ public class EventFactory {
private static Map createMapFromResult(final UpdateInfo> updateInfo) {
final Map map = new HashMap<>();
- final Result source = updateInfo.getSource();
- final Result target = updateInfo.getTarget();
+ final Result source = updateInfo.getSource().getResult();
+ final Result target = updateInfo.getTarget().getResult();
final List collectedFrom = target.getCollectedfrom();
if (collectedFrom.size() == 1) {
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
index 05fab47f0..4d40ba80d 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
@@ -3,15 +3,9 @@ package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
@@ -26,94 +20,38 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
-import eu.dnetlib.dhp.broker.model.EventFactory;
-import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsRelatedTo;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedBy;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedTo;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetReferences;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMissingProject;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMoreProject;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsReferencedBy;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsRelatedTo;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedBy;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedTo;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationReferences;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMissingSoftware;
-import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMoreSoftware;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAbstract;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAuthorOrcid;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingOpenAccess;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPid;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSubject;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
+import eu.dnetlib.dhp.broker.oa.util.EventFinder;
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
-import eu.dnetlib.dhp.broker.oa.util.ResultAggregator;
-import eu.dnetlib.dhp.broker.oa.util.ResultGroup;
-import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelationsAggregator;
import eu.dnetlib.dhp.common.HdfsSupport;
-import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
+import eu.dnetlib.dhp.utils.ISLookupClientFactory;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import eu.dnetlib.pace.config.DedupConfig;
import scala.Tuple2;
public class GenerateEventsApplication {
private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class);
- // Simple Matchers
- private static final UpdateMatcher enrichMissingAbstract = new EnrichMissingAbstract();
- private static final UpdateMatcher enrichMissingAuthorOrcid = new EnrichMissingAuthorOrcid();
- private static final UpdateMatcher enrichMissingOpenAccess = new EnrichMissingOpenAccess();
- private static final UpdateMatcher enrichMissingPid = new EnrichMissingPid();
- private static final UpdateMatcher enrichMissingPublicationDate = new EnrichMissingPublicationDate();
- private static final UpdateMatcher enrichMissingSubject = new EnrichMissingSubject();
- private static final UpdateMatcher enrichMoreOpenAccess = new EnrichMoreOpenAccess();
- private static final UpdateMatcher enrichMorePid = new EnrichMorePid();
- private static final UpdateMatcher enrichMoreSubject = new EnrichMoreSubject();
-
- // Advanced matchers
- private static final UpdateMatcher>, ?> enrichMissingProject = new EnrichMissingProject();
- private static final UpdateMatcher>, ?> enrichMoreProject = new EnrichMoreProject();
-
- private static final UpdateMatcher>, ?> enrichMissingSoftware = new EnrichMissingSoftware();
- private static final UpdateMatcher>, ?> enrichMoreSoftware = new EnrichMoreSoftware();
-
- private static final UpdateMatcher>, ?> enrichMisissingPublicationIsRelatedTo = new EnrichMissingPublicationIsRelatedTo();
- private static final UpdateMatcher>, ?> enrichMissingPublicationIsReferencedBy =
- new EnrichMissingPublicationIsReferencedBy();
- private static final UpdateMatcher>, ?> enrichMissingPublicationReferences = new EnrichMissingPublicationReferences();
- private static final UpdateMatcher>, ?> enrichMissingPublicationIsSupplementedTo =
- new EnrichMissingPublicationIsSupplementedTo();
- private static final UpdateMatcher>, ?> enrichMissingPublicationIsSupplementedBy =
- new EnrichMissingPublicationIsSupplementedBy();
-
- private static final UpdateMatcher>, ?> enrichMisissingDatasetIsRelatedTo =
- new EnrichMissingDatasetIsRelatedTo();
- private static final UpdateMatcher>, ?> enrichMissingDatasetIsReferencedBy =
- new EnrichMissingDatasetIsReferencedBy();
- private static final UpdateMatcher>, ?> enrichMissingDatasetReferences =
- new EnrichMissingDatasetReferences();
- private static final UpdateMatcher>, ?> enrichMissingDatasetIsSupplementedTo =
- new EnrichMissingDatasetIsSupplementedTo();
- private static final UpdateMatcher>, ?> enrichMissingDatasetIsSupplementedBy =
- new EnrichMissingDatasetIsSupplementedBy();
-
- public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
- .toString(GenerateEventsApplication.class
- .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json")));
+ .toString(
+ GenerateEventsApplication.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
@@ -128,8 +66,16 @@ public class GenerateEventsApplication {
final String eventsPath = parser.get("eventsPath");
log.info("eventsPath: {}", eventsPath);
+ final String isLookupUrl = parser.get("isLookupUrl");
+ log.info("isLookupUrl: {}", isLookupUrl);
+
+ final String dedupConfigProfileId = parser.get("dedupConfProfile");
+ log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
+
final SparkConf conf = new SparkConf();
+ final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
+
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
removeOutputDir(spark, eventsPath);
@@ -137,14 +83,10 @@ public class GenerateEventsApplication {
final Dataset all = spark.emptyDataset(Encoders.kryo(Event.class));
for (final Class extends Result> r1 : BrokerConstants.RESULT_CLASSES) {
- all.union(generateSimpleEvents(spark, graphPath, r1));
-
- for (final Class extends Result> r2 : BrokerConstants.RESULT_CLASSES) {
- all.union(generateRelationEvents(spark, graphPath, r1, r2));
- }
+ all.union(generateEvents(spark, graphPath, r1, dedupConfig));
}
- all.write().mode(SaveMode.Overwrite).json(eventsPath);
+ all.write().mode(SaveMode.Overwrite).option("compression", "gzip").json(eventsPath);
});
}
@@ -153,155 +95,84 @@ public class GenerateEventsApplication {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
- private static Dataset generateSimpleEvents(final SparkSession spark,
- final String graphPath,
- final Class resultClazz) {
-
- final Dataset results = readPath(spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), Result.class)
- .filter(r -> r.getDataInfo().getDeletedbyinference());
-
- final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class)
- .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
-
- final TypedColumn, ResultGroup> aggr = new ResultAggregator().toColumn();
-
- return results.joinWith(rels, results.col("id").equalTo(rels.col("source")), "inner")
- .groupByKey((MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING())
- .agg(aggr)
- .map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class))
- .filter(ResultGroup::isValid)
- .map((MapFunction) g -> GenerateEventsApplication.generateSimpleEvents(g), Encoders.kryo(EventGroup.class))
- .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class));
- }
-
- private static EventGroup generateSimpleEvents(final ResultGroup results) {
- final List> list = new ArrayList<>();
-
- for (final Result target : results.getData()) {
- list.addAll(enrichMissingAbstract.searchUpdatesForRecord(target, results.getData()));
- list.addAll(enrichMissingAuthorOrcid.searchUpdatesForRecord(target, results.getData()));
- list.addAll(enrichMissingOpenAccess.searchUpdatesForRecord(target, results.getData()));
- list.addAll(enrichMissingPid.searchUpdatesForRecord(target, results.getData()));
- list.addAll(enrichMissingPublicationDate.searchUpdatesForRecord(target, results.getData()));
- list.addAll(enrichMissingSubject.searchUpdatesForRecord(target, results.getData()));
- list.addAll(enrichMoreOpenAccess.searchUpdatesForRecord(target, results.getData()));
- list.addAll(enrichMorePid.searchUpdatesForRecord(target, results.getData()));
- list.addAll(enrichMoreSubject.searchUpdatesForRecord(target, results.getData()));
- }
-
- final EventGroup events = new EventGroup();
- list.stream().map(EventFactory::newBrokerEvent).forEach(events::addElement);
- return events;
- }
-
- private static Dataset generateRelationEvents(final SparkSession spark,
+ private static Dataset generateEvents(
+ final SparkSession spark,
final String graphPath,
final Class sourceClass,
- final Class targetClass) {
+ final DedupConfig dedupConfig) {
- final Dataset sources = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
- .filter(r -> r.getDataInfo().getDeletedbyinference());
-
- final Dataset targets = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), targetClass);
+ final Dataset results = expandResultsWithRelations(spark, graphPath, sourceClass);
final Dataset mergedRels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
+ final TypedColumn, ResultGroup> aggr = new ResultAggregator()
+ .toColumn();
+
+ return results
+ .joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner")
+ .groupByKey(
+ (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING())
+ .agg(aggr)
+ .map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class))
+ .filter(ResultGroup::isValid)
+ .map(
+ (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig),
+ Encoders.kryo(EventGroup.class))
+ .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class));
+ }
+
+ private static Dataset expandResultsWithRelations(
+ final SparkSession spark,
+ final String graphPath,
+ final Class sourceClass) {
+ final Dataset projects = readPath(spark, graphPath + "/project", Project.class);
+ final Dataset datasets = readPath(
+ spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
+ final Dataset softwares = readPath(spark, graphPath + "/software", Software.class);
+ final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class);
+
final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
- if (targetClass == Project.class) {
- // TODO join using: generateProjectsEvents
- } else if (targetClass == Software.class) {
- // TODO join using: generateSoftwareEvents
- } else if (targetClass == Publication.class) {
- // TODO join using: generatePublicationRelatedEvents
- } else if (targetClass == eu.dnetlib.dhp.schema.oaf.Dataset.class) {
- // TODO join using: generateDatasetRelatedEvents
- }
+ final Dataset r0 = readPath(
+ spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class)
+ .filter(r -> r.getDataInfo().getDeletedbyinference())
+ .map(r -> new ResultWithRelations(r), Encoders.kryo(ResultWithRelations.class));
+ final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
+ final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class));
+ final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class));
+ final Dataset r4 = join(
+ r3, rels, relatedEntities(publications, rels, RelatedProject.class));
+ ;
- return null;
+ return r4;
}
- private List generateProjectsEvents(final Collection>> childrenWithProjects) {
- final List> list = new ArrayList<>();
-
- for (final Pair> target : childrenWithProjects) {
- list.addAll(enrichMissingProject.searchUpdatesForRecord(target, childrenWithProjects));
- list.addAll(enrichMoreProject.searchUpdatesForRecord(target, childrenWithProjects));
- }
-
- return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
+ private static Dataset