diff --git a/dhp-build/dhp-build-assembly-resources/pom.xml b/dhp-build/dhp-build-assembly-resources/pom.xml
index 327c33d6f..8bae191d3 100644
--- a/dhp-build/dhp-build-assembly-resources/pom.xml
+++ b/dhp-build/dhp-build-assembly-resources/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp-build
- 1.2.1-SNAPSHOT
+ 1.2.2-SNAPSHOT
dhp-build-assembly-resources
diff --git a/dhp-build/dhp-build-properties-maven-plugin/pom.xml b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
index 873046e08..ad8cd57b4 100644
--- a/dhp-build/dhp-build-properties-maven-plugin/pom.xml
+++ b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp-build
- 1.2.1-SNAPSHOT
+ 1.2.2-SNAPSHOT
dhp-build-properties-maven-plugin
diff --git a/dhp-build/dhp-code-style/pom.xml b/dhp-build/dhp-code-style/pom.xml
index 8099a72e4..08f5de9ee 100644
--- a/dhp-build/dhp-code-style/pom.xml
+++ b/dhp-build/dhp-code-style/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp-code-style
- 1.2.1-SNAPSHOT
+ 1.2.2-SNAPSHOT
jar
diff --git a/dhp-build/pom.xml b/dhp-build/pom.xml
index a700a2918..369e25b24 100644
--- a/dhp-build/pom.xml
+++ b/dhp-build/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp
- 1.2.1-SNAPSHOT
+ 1.2.2-SNAPSHOT
dhp-build
pom
diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index c7cb11b08..60e66f45a 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp
- 1.2.1-SNAPSHOT
+ 1.2.2-SNAPSHOT
../
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/PacePerson.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java
similarity index 98%
rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/PacePerson.java
rename to dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java
index d1c615dcd..1909ddcca 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/PacePerson.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java
@@ -1,7 +1,6 @@
-package eu.dnetlib.dhp.oa.graph.raw.common;
+package eu.dnetlib.dhp.common;
-import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.text.Normalizer;
import java.util.HashSet;
diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml
index fe5d0c431..5e864cf94 100644
--- a/dhp-schemas/pom.xml
+++ b/dhp-schemas/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp
- 1.2.1-SNAPSHOT
+ 1.2.2-SNAPSHOT
../
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/LicenseComparator.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/LicenseComparator.java
new file mode 100644
index 000000000..db523ad1a
--- /dev/null
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/LicenseComparator.java
@@ -0,0 +1,69 @@
+
+package eu.dnetlib.dhp.schema.common;
+
+import java.util.Comparator;
+
+import eu.dnetlib.dhp.schema.oaf.Qualifier;
+
+public class LicenseComparator implements Comparator {
+
+ @Override
+ public int compare(Qualifier left, Qualifier right) {
+
+ if (left == null && right == null)
+ return 0;
+ if (left == null)
+ return 1;
+ if (right == null)
+ return -1;
+
+ String lClass = left.getClassid();
+ String rClass = right.getClassid();
+
+ if (lClass.equals(rClass))
+ return 0;
+
+ if (lClass.equals("OPEN SOURCE"))
+ return -1;
+ if (rClass.equals("OPEN SOURCE"))
+ return 1;
+
+ if (lClass.equals("OPEN"))
+ return -1;
+ if (rClass.equals("OPEN"))
+ return 1;
+
+ if (lClass.equals("6MONTHS"))
+ return -1;
+ if (rClass.equals("6MONTHS"))
+ return 1;
+
+ if (lClass.equals("12MONTHS"))
+ return -1;
+ if (rClass.equals("12MONTHS"))
+ return 1;
+
+ if (lClass.equals("EMBARGO"))
+ return -1;
+ if (rClass.equals("EMBARGO"))
+ return 1;
+
+ if (lClass.equals("RESTRICTED"))
+ return -1;
+ if (rClass.equals("RESTRICTED"))
+ return 1;
+
+ if (lClass.equals("CLOSED"))
+ return -1;
+ if (rClass.equals("CLOSED"))
+ return 1;
+
+ if (lClass.equals("UNKNOWN"))
+ return -1;
+ if (rClass.equals("UNKNOWN"))
+ return 1;
+
+ // Else (but unlikely), lexicographical ordering will do.
+ return lClass.compareTo(rClass);
+ }
+}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
index accc06d12..e32dd10fa 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
@@ -13,6 +13,7 @@ public class ModelConstants {
public static final String DNET_DATA_CITE_DATE = "dnet:dataCite_date";
public static final String DNET_DATA_CITE_RESOURCE = "dnet:dataCite_resource";
public static final String DNET_PROVENANCE_ACTIONS = "dnet:provenanceActions";
+ public static final String DNET_COUNTRY_TYPE = "dnet:countries";
public static final String SYSIMPORT_CROSSWALK_REPOSITORY = "sysimport:crosswalk:repository";
public static final String SYSIMPORT_CROSSWALK_ENTITYREGISTRY = "sysimport:crosswalk:entityregistry";
@@ -49,6 +50,13 @@ public class ModelConstants {
public static final String HAS_PARTICIPANT = "hasParticipant";
public static final String IS_PARTICIPANT = "isParticipant";
+ public static final String RESULT_ORGANIZATION = "resultOrganization";
+ public static final String AFFILIATION = "affiliation";
+ public static final String IS_AUTHOR_INSTITUTION_OF = "isAuthorInstitutionOf";
+ public static final String HAS_AUTHOR_INSTITUTION = "hasAuthorInstitution";
+
+ public static final String MERGES = "merges";
+
public static final String UNKNOWN = "UNKNOWN";
public static final String NOT_AVAILABLE = "not available";
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java
index fc85b1ac1..9ee7c2deb 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java
@@ -1,10 +1,15 @@
package eu.dnetlib.dhp.schema.common;
+import static com.google.common.base.Preconditions.checkArgument;
+
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
+import org.apache.commons.lang3.StringUtils;
+
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.*;
@@ -379,6 +384,21 @@ public class ModelSupport {
entityMapping.get(EntityType.valueOf(targetType)).name());
}
+ public static String tableIdentifier(String dbName, String tableName) {
+
+ checkArgument(StringUtils.isNotBlank(dbName), "DB name cannot be empty");
+ checkArgument(StringUtils.isNotBlank(tableName), "table name cannot be empty");
+
+ return String.format("%s.%s", dbName, tableName);
+ }
+
+ public static String tableIdentifier(String dbName, Class clazz) {
+
+ checkArgument(Objects.nonNull(clazz), "clazz is needed to derive the table name, thus cannot be null");
+
+ return tableIdentifier(dbName, clazz.getSimpleName().toLowerCase());
+ }
+
public static Function idFn() {
return x -> {
if (isSubClass(x, Relation.class)) {
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/DataInfo.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/DataInfo.java
index cc77e1ea0..9d572ee30 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/DataInfo.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/DataInfo.java
@@ -8,7 +8,7 @@ public class DataInfo implements Serializable {
private Boolean invisible = false;
private Boolean inferred;
- private Boolean deletedbyinference;
+ private Boolean deletedbyinference = false;
private String trust;
private String inferenceprovenance;
private Qualifier provenanceaction;
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Field.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Field.java
index 1a85c6842..8358bc4b3 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Field.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Field.java
@@ -2,6 +2,7 @@
package eu.dnetlib.dhp.schema.oaf;
import java.io.Serializable;
+import java.util.Objects;
public class Field implements Serializable {
@@ -39,6 +40,6 @@ public class Field implements Serializable {
if (getClass() != obj.getClass())
return false;
Field other = (Field) obj;
- return getValue().equals(other.getValue());
+ return Objects.equals(getValue(), other.getValue());
}
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java
index 09742748d..2823ee49d 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java
@@ -106,6 +106,7 @@ public abstract class OafEntity extends Oaf implements Serializable {
.stream(lists)
.filter(Objects::nonNull)
.flatMap(List::stream)
+ .filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList());
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
index 711b1ca68..213a585a8 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
@@ -244,7 +244,25 @@ public class Result extends OafEntity implements Serializable {
subject = mergeLists(subject, r.getSubject());
+ //merge title lists: main title with higher trust and distinct between the others
+ StructuredProperty baseMainTitle = null;
+ if(title != null) {
+ baseMainTitle = getMainTitle(title);
+ title.remove(baseMainTitle);
+ }
+
+ StructuredProperty newMainTitle = null;
+ if(r.getTitle() != null) {
+ newMainTitle = getMainTitle(r.getTitle());
+ r.getTitle().remove(newMainTitle);
+ }
+
+ if (newMainTitle != null && compareTrust(this, r) < 0 )
+ baseMainTitle = newMainTitle;
+
title = mergeLists(title, r.getTitle());
+ if (title != null && baseMainTitle != null)
+ title.add(baseMainTitle);
relevantdate = mergeLists(relevantdate, r.getRelevantdate());
@@ -294,4 +312,14 @@ public class Result extends OafEntity implements Serializable {
}
return a.size() > b.size() ? a : b;
}
+
+ private StructuredProperty getMainTitle(List titles) {
+ //need to check if the list of titles contains more than 1 main title? (in that case, we should chose which main title select in the list)
+ for (StructuredProperty title: titles) {
+ if (title.getQualifier() != null && title.getQualifier().getClassid() != null)
+ if (title.getQualifier().getClassid().equals("main title"))
+ return title;
+ }
+ return null;
+ }
}
diff --git a/dhp-workflows/dhp-actionmanager/pom.xml b/dhp-workflows/dhp-actionmanager/pom.xml
index 22a81f7da..ec6247102 100644
--- a/dhp-workflows/dhp-actionmanager/pom.xml
+++ b/dhp-workflows/dhp-actionmanager/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp-workflows
- 1.2.1-SNAPSHOT
+ 1.2.2-SNAPSHOT
dhp-actionmanager
diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java
index 90d573ac0..e55c0eb7b 100644
--- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java
+++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java
@@ -523,7 +523,9 @@ public class ProtoConverter implements Serializable {
}
private static Context mapContext(ResultProtos.Result.Context context) {
-
+ if (context == null || StringUtils.isBlank(context.getId())) {
+ return null;
+ }
final Context entity = new Context();
entity.setId(context.getId());
entity
@@ -537,6 +539,10 @@ public class ProtoConverter implements Serializable {
}
public static KeyValue mapKV(FieldTypeProtos.KeyValue kv) {
+ if (kv == null || StringUtils.isBlank(kv.getKey()) & StringUtils.isBlank(kv.getValue())) {
+ return null;
+ }
+
final KeyValue keyValue = new KeyValue();
keyValue.setKey(kv.getKey());
keyValue.setValue(kv.getValue());
@@ -575,6 +581,10 @@ public class ProtoConverter implements Serializable {
}
public static StructuredProperty mapStructuredProperty(FieldTypeProtos.StructuredProperty sp) {
+ if (sp == null | StringUtils.isBlank(sp.getValue())) {
+ return null;
+ }
+
final StructuredProperty structuredProperty = new StructuredProperty();
structuredProperty.setValue(sp.getValue());
structuredProperty.setQualifier(mapQualifier(sp.getQualifier()));
@@ -611,6 +621,10 @@ public class ProtoConverter implements Serializable {
}
public static Field mapStringField(FieldTypeProtos.StringField s) {
+ if (s == null || StringUtils.isBlank(s.getValue())) {
+ return null;
+ }
+
final Field stringField = new Field<>();
stringField.setValue(s.getValue());
stringField.setDataInfo(mapDataInfo(s.getDataInfo()));
@@ -618,19 +632,16 @@ public class ProtoConverter implements Serializable {
}
public static Field mapBoolField(FieldTypeProtos.BoolField b) {
+ if (b == null) {
+ return null;
+ }
+
final Field booleanField = new Field<>();
booleanField.setValue(b.getValue());
booleanField.setDataInfo(mapDataInfo(b.getDataInfo()));
return booleanField;
}
- public static Field mapIntField(FieldTypeProtos.IntField b) {
- final Field entity = new Field<>();
- entity.setValue(b.getValue());
- entity.setDataInfo(mapDataInfo(b.getDataInfo()));
- return entity;
- }
-
public static Journal mapJournal(FieldTypeProtos.Journal j) {
final Journal journal = new Journal();
journal.setConferencedate(j.getConferencedate());
diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml
index 1c5465c14..9f082df70 100644
--- a/dhp-workflows/dhp-aggregation/pom.xml
+++ b/dhp-workflows/dhp-aggregation/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp-workflows
- 1.2.1-SNAPSHOT
+ 1.2.2-SNAPSHOT
dhp-aggregation
diff --git a/dhp-workflows/dhp-blacklist/pom.xml b/dhp-workflows/dhp-blacklist/pom.xml
index 37abc22f6..a3cc15b74 100644
--- a/dhp-workflows/dhp-blacklist/pom.xml
+++ b/dhp-workflows/dhp-blacklist/pom.xml
@@ -1,11 +1,9 @@
-
+
dhp-workflows
eu.dnetlib.dhp
- 1.2.1-SNAPSHOT
+ 1.2.2-SNAPSHOT
4.0.0
diff --git a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/PrepareMergedRelationJob.java b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/PrepareMergedRelationJob.java
index 0ef59e8c2..b4bcc509e 100644
--- a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/PrepareMergedRelationJob.java
+++ b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/PrepareMergedRelationJob.java
@@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class PrepareMergedRelationJob {
@@ -56,6 +57,7 @@ public class PrepareMergedRelationJob {
conf,
isSparkSessionManaged,
spark -> {
+ removeOutputDir(spark, outputPath);
selectMergesRelations(
spark,
inputPath,
@@ -84,4 +86,9 @@ public class PrepareMergedRelationJob {
(MapFunction) value -> OBJECT_MAPPER.readValue(value, Relation.class),
Encoders.bean(Relation.class));
}
+
+ private static void removeOutputDir(SparkSession spark, String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
}
diff --git a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java
index 86587bfc9..91bcb9d1c 100644
--- a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java
+++ b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java
@@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
@@ -62,6 +63,7 @@ public class SparkRemoveBlacklistedRelationJob {
conf,
isSparkSessionManaged,
spark -> {
+ removeOutputDir(spark, outputPath);
removeBlacklistedRelations(
spark,
blacklistPath,
@@ -69,7 +71,6 @@ public class SparkRemoveBlacklistedRelationJob {
outputPath,
mergesPath);
});
-
}
private static void removeBlacklistedRelations(SparkSession spark, String blacklistPath, String inputPath,
@@ -78,8 +79,6 @@ public class SparkRemoveBlacklistedRelationJob {
Dataset inputRelation = readRelations(spark, inputPath);
Dataset mergesRelation = readRelations(spark, mergesPath);
- log.info("InputRelationCount: {}", inputRelation.count());
-
Dataset dedupSource = blackListed
.joinWith(
mergesRelation, blackListed.col("source").equalTo(mergesRelation.col("target")),
@@ -102,11 +101,6 @@ public class SparkRemoveBlacklistedRelationJob {
return c._1();
}, Encoders.bean(Relation.class));
- dedupBL
- .write()
- .mode(SaveMode.Overwrite)
- .json(blacklistPath + "/deduped");
-
inputRelation
.joinWith(
dedupBL, (inputRelation
@@ -144,4 +138,8 @@ public class SparkRemoveBlacklistedRelationJob {
Encoders.bean(Relation.class));
}
+ private static void removeOutputDir(SparkSession spark, String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
}
diff --git a/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml
index 59fd30fea..dd7827da4 100644
--- a/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml
@@ -1,4 +1,4 @@
-
+
postgresURL
@@ -22,6 +22,25 @@
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ mapreduce.job.queuename
+ ${queueName}
+
+
+ oozie.launcher.mapred.job.queue.name
+ ${oozieLauncherQueueName}
+
+
+ oozie.action.sharelib.for.spark
+ ${oozieActionShareLibForSpark2}
+
+
+
+
@@ -49,8 +68,6 @@
- ${jobTracker}
- ${nameNode}
${nameNode}/${sourcePath}/publication
${nameNode}/${outputPath}/publication
@@ -60,8 +77,6 @@
- ${jobTracker}
- ${nameNode}
${nameNode}/${sourcePath}/dataset
${nameNode}/${outputPath}/dataset
@@ -71,8 +86,6 @@
- ${jobTracker}
- ${nameNode}
${nameNode}/${sourcePath}/otherresearchproduct
${nameNode}/${outputPath}/otherresearchproduct
@@ -82,8 +95,6 @@
- ${jobTracker}
- ${nameNode}
${nameNode}/${sourcePath}/software
${nameNode}/${outputPath}/software
@@ -93,8 +104,6 @@
- ${jobTracker}
- ${nameNode}
${nameNode}/${sourcePath}/organization
${nameNode}/${outputPath}/organization
@@ -102,10 +111,8 @@
-
+
- ${jobTracker}
- ${nameNode}
${nameNode}/${sourcePath}/project
${nameNode}/${outputPath}/project
@@ -113,10 +120,8 @@
-
+
- ${jobTracker}
- ${nameNode}
${nameNode}/${sourcePath}/datasource
${nameNode}/${outputPath}/datasource
@@ -128,8 +133,6 @@
- ${jobTracker}
- ${nameNode}
eu.dnetlib.dhp.blacklist.ReadBlacklistFromDB
--hdfsPath${workingDir}/blacklist
--hdfsNameNode${nameNode}
@@ -156,6 +159,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
--sourcePath${sourcePath}/relation
--outputPath${workingDir}/mergesRelation
@@ -180,6 +184,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
--sourcePath${sourcePath}/relation
--outputPath${outputPath}/relation
diff --git a/dhp-workflows/dhp-blacklist/src/test/java/eu/dnetlib/dhp/blacklist/BlackListTest.java b/dhp-workflows/dhp-blacklist/src/test/java/eu/dnetlib/dhp/blacklist/BlackListTest.java
index bbfd15674..585848589 100644
--- a/dhp-workflows/dhp-blacklist/src/test/java/eu/dnetlib/dhp/blacklist/BlackListTest.java
+++ b/dhp-workflows/dhp-blacklist/src/test/java/eu/dnetlib/dhp/blacklist/BlackListTest.java
@@ -61,12 +61,6 @@ public class BlackListTest {
spark.stop();
}
- /*
- * String inputPath = parser.get("sourcePath"); log.info("inputPath: {}", inputPath); final String outputPath =
- * parser.get("outputPath"); log.info("outputPath {}: ", outputPath); final String blacklistPath =
- * parser.get("hdfsPath"); log.info("blacklistPath {}: ", blacklistPath); final String mergesPath =
- * parser.get("mergesPath"); log.info("mergesPath {}: ", mergesPath);
- */
@Test
public void noRemoveTest() throws Exception {
SparkRemoveBlacklistedRelationJob
diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml
index 8b7ec3851..0e84d99a4 100644
--- a/dhp-workflows/dhp-broker-events/pom.xml
+++ b/dhp-workflows/dhp-broker-events/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.1-SNAPSHOT
+ 1.2.2-SNAPSHOT
4.0.0
@@ -57,7 +57,7 @@
eu.dnetlib
dnet-openaire-broker-common
- [1.0.0,2.0.0)
+ [2.0.0,3.0.0)
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
index 0694556b2..9e5d98644 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
@@ -29,31 +29,32 @@ public class EventFactory {
"yyyy-MM-dd"
};
- public static Event newBrokerEvent(final Result source, final Result target, final UpdateInfo> updateInfo) {
+ public static Event newBrokerEvent(final UpdateInfo> updateInfo) {
final long now = new Date().getTime();
final Event res = new Event();
- final Map map = createMapFromResult(target, source, updateInfo);
+ final Map map = createMapFromResult(updateInfo);
- final String payload = createPayload(target, updateInfo);
+ final String payload = createPayload(updateInfo);
final String eventId = calculateEventId(
- updateInfo.getTopic(), target.getOriginalId().get(0), updateInfo.getHighlightValueAsString());
+ updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0),
+ updateInfo.getHighlightValueAsString());
res.setEventId(eventId);
res.setProducerId(PRODUCER_ID);
res.setPayload(payload);
res.setMap(map);
- res.setTopic(updateInfo.getTopic());
+ res.setTopic(updateInfo.getTopicPath());
res.setCreationDate(now);
res.setExpiryDate(calculateExpiryDate(now));
res.setInstantMessage(false);
return res;
}
- private static String createPayload(final Result result, final UpdateInfo> updateInfo) {
+ private static String createPayload(final UpdateInfo> updateInfo) {
final OpenAireEventPayload payload = new OpenAireEventPayload();
// TODO
@@ -62,32 +63,34 @@ public class EventFactory {
return payload.toJSON();
}
- private static Map createMapFromResult(final Result oaf, final Result source,
- final UpdateInfo> updateInfo) {
+ private static Map createMapFromResult(final UpdateInfo> updateInfo) {
final Map map = new HashMap<>();
- final List collectedFrom = oaf.getCollectedfrom();
+ final Result source = updateInfo.getSource();
+ final Result target = updateInfo.getTarget();
+
+ final List collectedFrom = target.getCollectedfrom();
if (collectedFrom.size() == 1) {
map.put("target_datasource_id", collectedFrom.get(0).getKey());
map.put("target_datasource_name", collectedFrom.get(0).getValue());
}
- final List ids = oaf.getOriginalId();
+ final List ids = target.getOriginalId();
if (ids.size() > 0) {
map.put("target_publication_id", ids.get(0));
}
- final List titles = oaf.getTitle();
+ final List titles = target.getTitle();
if (titles.size() > 0) {
map.put("target_publication_title", titles.get(0));
}
- final long date = parseDateTolong(oaf.getDateofacceptance().getValue());
+ final long date = parseDateTolong(target.getDateofacceptance().getValue());
if (date > 0) {
map.put("target_dateofacceptance", date);
}
- final List subjects = oaf.getSubject();
+ final List subjects = target.getSubject();
if (subjects.size() > 0) {
map
.put(
@@ -95,7 +98,7 @@ public class EventFactory {
subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList()));
}
- final List authors = oaf.getAuthor();
+ final List authors = target.getAuthor();
if (authors.size() > 0) {
map
.put(
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Topic.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Topic.java
new file mode 100644
index 000000000..0716bd98d
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Topic.java
@@ -0,0 +1,66 @@
+
+package eu.dnetlib.dhp.broker.model;
+
+public enum Topic {
+
+ // ENRICHMENT MISSING
+ ENRICH_MISSING_OA_VERSION("ENRICH/MISSING/OPENACCESS_VERSION"), ENRICH_MISSING_ABSTRACT(
+ "ENRICH/MISSING/ABSTRACT"), ENRICH_MISSING_PUBLICATION_DATE(
+ "ENRICH/MISSING/PUBLICATION_DATE"), ENRICH_MISSING_PID(
+ "ENRICH/MISSING/PID"), ENRICH_MISSING_PROJECT("ENRICH/MISSING/PROJECT"), ENRICH_MISSING_SOFTWARE(
+ "ENRICH/MISSING/SOFTWARE"), ENRICH_MISSING_SUBJECT_MESHEUROPMC(
+ "ENRICH/MISSING/SUBJECT/MESHEUROPMC"), ENRICH_MISSING_SUBJECT_ARXIV(
+ "ENRICH/MISSING/SUBJECT/ARXIV"), ENRICH_MISSING_SUBJECT_JEL(
+ "ENRICH/MISSING/SUBJECT/JEL"), ENRICH_MISSING_SUBJECT_DDC(
+ "ENRICH/MISSING/SUBJECT/DDC"), ENRICH_MISSING_SUBJECT_ACM(
+ "ENRICH/MISSING/SUBJECT/ACM"), ENRICH_MISSING_SUBJECT_RVK(
+ "ENRICH/MISSING/SUBJECT/RVK"), ENRICH_MISSING_AUTHOR_ORCID(
+ "ENRICH/MISSING/AUTHOR/ORCID"),
+
+ // ENRICHMENT MORE
+ ENRICH_MORE_PID("ENRICH/MORE/PID"), ENRICH_MORE_OA_VERSION("ENRICH/MORE/OPENACCESS_VERSION"), ENRICH_MORE_ABSTRACT(
+ "ENRICH/MORE/ABSTRACT"), ENRICH_MORE_PUBLICATION_DATE("ENRICH/MORE/PUBLICATION_DATE"), ENRICH_MORE_PROJECT(
+ "ENRICH/MORE/PROJECT"), ENRICH_MORE_SOFTWARE("ENRICH/MORE/SOFTWARE"), ENRICH_MORE_SUBJECT_MESHEUROPMC(
+ "ENRICH/MORE/SUBJECT/MESHEUROPMC"), ENRICH_MORE_SUBJECT_ARXIV(
+ "ENRICH/MORE/SUBJECT/ARXIV"), ENRICH_MORE_SUBJECT_JEL(
+ "ENRICH/MORE/SUBJECT/JEL"), ENRICH_MORE_SUBJECT_DDC(
+ "ENRICH/MORE/SUBJECT/DDC"), ENRICH_MORE_SUBJECT_ACM(
+ "ENRICH/MORE/SUBJECT/ACM"), ENRICH_MORE_SUBJECT_RVK("ENRICH/MORE/SUBJECT/RVK"),
+
+ // ADDITION
+ ADD_BY_PROJECT("ADD/BY_PROJECT"),
+
+ // OTHER RELS
+ ENRICH_MISSING_PUBLICATION_IS_RELATED_TO(
+ "ENRICH/MISSING/PUBLICATION/IS_RELATED_TO"), ENRICH_MISSING_PUBLICATION_REFERENCES(
+ "ENRICH/MISSING/PUBLICATION/REFERENCES"), ENRICH_MISSING_PUBLICATION_IS_REFERENCED_BY(
+ "ENRICH/MISSING/PUBLICATION/IS_REFERENCED_BY"), ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_TO(
+ "ENRICH/MISSING/PUBLICATION/IS_SUPPLEMENTED_TO"), ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_BY(
+ "ENRICH/MISSING/PUBLICATION/IS_SUPPLEMENTED_BY"),
+
+ ENRICH_MISSING_DATASET_IS_RELATED_TO("ENRICH/MISSING/DATASET/IS_RELATED_TO"), ENRICH_MISSING_DATASET_REFERENCES(
+ "ENRICH/MISSING/DATASET/REFERENCES"), ENRICH_MISSING_DATASET_IS_REFERENCED_BY(
+ "ENRICH/MISSING/DATASET/IS_REFERENCED_BY"), ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_TO(
+ "ENRICH/MISSING/DATASET/IS_SUPPLEMENTED_TO"), ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_BY(
+ "ENRICH/MISSING/DATASET/IS_SUPPLEMENTED_BY"),;
+
+ Topic(final String path) {
+ this.path = path;
+ }
+
+ protected String path;
+
+ public String getPath() {
+ return this.path;
+ }
+
+ public static Topic fromPath(final String path) {
+ for (final Topic t : Topic.values()) {
+ if (t.getPath().equals(path)) {
+ return t;
+ }
+ }
+ return null;
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
index 54d4ef36a..d5e577972 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
@@ -4,12 +4,23 @@ package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -19,25 +30,75 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.model.EventFactory;
-import eu.dnetlib.dhp.broker.oa.util.EnrichMissingAbstract;
-import eu.dnetlib.dhp.broker.oa.util.EnrichMissingAuthorOrcid;
-import eu.dnetlib.dhp.broker.oa.util.EnrichMissingOpenAccess;
-import eu.dnetlib.dhp.broker.oa.util.EnrichMissingPid;
-import eu.dnetlib.dhp.broker.oa.util.EnrichMissingProject;
-import eu.dnetlib.dhp.broker.oa.util.EnrichMissingPublicationDate;
-import eu.dnetlib.dhp.broker.oa.util.EnrichMissingSubject;
-import eu.dnetlib.dhp.broker.oa.util.EnrichMoreOpenAccess;
-import eu.dnetlib.dhp.broker.oa.util.EnrichMorePid;
-import eu.dnetlib.dhp.broker.oa.util.EnrichMoreSubject;
+import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy;
+import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsRelatedTo;
+import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedBy;
+import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedTo;
+import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetReferences;
+import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMissingProject;
+import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMoreProject;
+import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsReferencedBy;
+import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsRelatedTo;
+import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedBy;
+import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedTo;
+import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationReferences;
+import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAbstract;
+import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAuthorOrcid;
+import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingOpenAccess;
+import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPid;
+import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate;
+import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSoftware;
+import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSubject;
+import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess;
+import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid;
+import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSoftware;
+import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject;
+import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
+import eu.dnetlib.dhp.schema.oaf.Project;
+import eu.dnetlib.dhp.schema.oaf.Publication;
+import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
+import eu.dnetlib.dhp.schema.oaf.Software;
public class GenerateEventsApplication {
private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class);
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ // Simple Matchers
+ private static final UpdateMatcher enrichMissingAbstract = new EnrichMissingAbstract();
+ private static final UpdateMatcher enrichMissingAuthorOrcid = new EnrichMissingAuthorOrcid();
+ private static final UpdateMatcher enrichMissingOpenAccess = new EnrichMissingOpenAccess();
+ private static final UpdateMatcher enrichMissingPid = new EnrichMissingPid();
+ private static final UpdateMatcher enrichMissingPublicationDate = new EnrichMissingPublicationDate();
+ private static final UpdateMatcher enrichMissingSubject = new EnrichMissingSubject();
+ private static final UpdateMatcher enrichMoreOpenAccess = new EnrichMoreOpenAccess();
+ private static final UpdateMatcher enrichMorePid = new EnrichMorePid();
+ private static final UpdateMatcher enrichMoreSubject = new EnrichMoreSubject();
+
+ // Advanced matchers
+ private static final UpdateMatcher>, ?> enrichMissingProject = new EnrichMissingProject();
+ private static final UpdateMatcher>, ?> enrichMoreProject = new EnrichMoreProject();
+
+ private static final UpdateMatcher>, ?> enrichMissingSoftware = new EnrichMissingSoftware();
+ private static final UpdateMatcher>, ?> enrichMoreSoftware = new EnrichMoreSoftware();
+
+ private static final UpdateMatcher>, ?> enrichMisissingPublicationIsRelatedTo = new EnrichMissingPublicationIsRelatedTo();
+ private static final UpdateMatcher>, ?> enrichMissingPublicationIsReferencedBy = new EnrichMissingPublicationIsReferencedBy();
+ private static final UpdateMatcher>, ?> enrichMissingPublicationReferences = new EnrichMissingPublicationReferences();
+ private static final UpdateMatcher>, ?> enrichMissingPublicationIsSupplementedTo = new EnrichMissingPublicationIsSupplementedTo();
+ private static final UpdateMatcher>, ?> enrichMissingPublicationIsSupplementedBy = new EnrichMissingPublicationIsSupplementedBy();
+
+ private static final UpdateMatcher>, ?> enrichMisissingDatasetIsRelatedTo = new EnrichMissingDatasetIsRelatedTo();
+ private static final UpdateMatcher>, ?> enrichMissingDatasetIsReferencedBy = new EnrichMissingDatasetIsReferencedBy();
+ private static final UpdateMatcher>, ?> enrichMissingDatasetReferences = new EnrichMissingDatasetReferences();
+ private static final UpdateMatcher>, ?> enrichMissingDatasetIsSupplementedTo = new EnrichMissingDatasetIsSupplementedTo();
+ private static final UpdateMatcher>, ?> enrichMissingDatasetIsSupplementedBy = new EnrichMissingDatasetIsSupplementedBy();
+
+ public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
@@ -60,9 +121,19 @@ public class GenerateEventsApplication {
log.info("eventsPath: {}", eventsPath);
final SparkConf conf = new SparkConf();
+
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
+ final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
removeOutputDir(spark, eventsPath);
- generateEvents(spark, graphPath, eventsPath);
+
+ final JavaRDD eventsRdd = sc.emptyRDD();
+
+ eventsRdd.union(generateSimpleEvents(spark, graphPath, Publication.class));
+ eventsRdd.union(generateSimpleEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class));
+ eventsRdd.union(generateSimpleEvents(spark, graphPath, Software.class));
+ eventsRdd.union(generateSimpleEvents(spark, graphPath, OtherResearchProduct.class));
+
+ eventsRdd.saveAsTextFile(eventsPath, GzipCodec.class);
});
}
@@ -71,42 +142,139 @@ public class GenerateEventsApplication {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
- private static void generateEvents(final SparkSession spark, final String graphPath, final String eventsPath) {
- // TODO
+ private static JavaRDD generateSimpleEvents(final SparkSession spark,
+ final String graphPath,
+ final Class resultClazz) {
+
+ final Dataset results = readPath(
+ spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), resultClazz)
+ .filter(r -> r.getDataInfo().getDeletedbyinference());
+
+ final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class)
+ .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
+
+ final Column c = null; // TODO
+
+ final Dataset aa = results
+ .joinWith(rels, results.col("id").equalTo(rels.col("source")), "inner")
+ .groupBy(rels.col("target"))
+ .agg(c)
+ .filter(x -> x.size() > 1)
+ // generateSimpleEvents(...)
+ // flatMap()
+ // toRdd()
+ ;
+
+ return null;
+
}
- private List generateEvents(final Result... children) {
- final List list = new ArrayList<>();
+ private List generateSimpleEvents(final Collection children) {
+ final List> list = new ArrayList<>();
- for (final Result source : children) {
- for (final Result target : children) {
- if (source != target) {
- list
- .addAll(
- findUpdates(source, target)
- .stream()
- .map(info -> EventFactory.newBrokerEvent(source, target, info))
- .collect(Collectors.toList()));
- }
+ for (final Result target : children) {
+ list.addAll(enrichMissingAbstract.searchUpdatesForRecord(target, children));
+ list.addAll(enrichMissingAuthorOrcid.searchUpdatesForRecord(target, children));
+ list.addAll(enrichMissingOpenAccess.searchUpdatesForRecord(target, children));
+ list.addAll(enrichMissingPid.searchUpdatesForRecord(target, children));
+ list.addAll(enrichMissingPublicationDate.searchUpdatesForRecord(target, children));
+ list.addAll(enrichMissingSubject.searchUpdatesForRecord(target, children));
+ list.addAll(enrichMoreOpenAccess.searchUpdatesForRecord(target, children));
+ list.addAll(enrichMorePid.searchUpdatesForRecord(target, children));
+ list.addAll(enrichMoreSubject.searchUpdatesForRecord(target, children));
+ }
+
+ return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
+ }
+
+ private List generateProjectsEvents(final Collection>> childrenWithProjects) {
+ final List> list = new ArrayList<>();
+
+ for (final Pair> target : childrenWithProjects) {
+ list.addAll(enrichMissingProject.searchUpdatesForRecord(target, childrenWithProjects));
+ list.addAll(enrichMoreProject.searchUpdatesForRecord(target, childrenWithProjects));
+ }
+
+ return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
+ }
+
+ private List generateSoftwareEvents(final Collection>> childrenWithSoftwares) {
+ final List> list = new ArrayList<>();
+
+ for (final Pair> target : childrenWithSoftwares) {
+ list.addAll(enrichMissingSoftware.searchUpdatesForRecord(target, childrenWithSoftwares));
+ list.addAll(enrichMoreSoftware.searchUpdatesForRecord(target, childrenWithSoftwares));
+ }
+ return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
+ }
+
+ private List generatePublicationRelatedEvents(final String relType,
+ final Collection>>> childrenWithRels) {
+
+ final List> list = new ArrayList<>();
+
+ final List>> cleanedChildrens = childrenWithRels
+ .stream()
+ .filter(p -> p.getRight().containsKey(relType))
+ .map(p -> Pair.of(p.getLeft(), p.getRight().get(relType)))
+ .filter(p -> p.getRight().size() > 0)
+ .collect(Collectors.toList());
+
+ for (final Pair> target : cleanedChildrens) {
+ if (relType.equals("isRelatedTo")) {
+ list.addAll(enrichMisissingPublicationIsRelatedTo.searchUpdatesForRecord(target, cleanedChildrens));
+ } else if (relType.equals("references")) {
+ list.addAll(enrichMissingPublicationReferences.searchUpdatesForRecord(target, cleanedChildrens));
+ } else if (relType.equals("isReferencedBy")) {
+ list.addAll(enrichMissingPublicationIsReferencedBy.searchUpdatesForRecord(target, cleanedChildrens));
+ } else if (relType.equals("isSupplementedTo")) {
+ list.addAll(enrichMissingPublicationIsSupplementedTo.searchUpdatesForRecord(target, cleanedChildrens));
+ } else if (relType.equals("isSupplementedBy")) {
+ list.addAll(enrichMissingPublicationIsSupplementedBy.searchUpdatesForRecord(target, cleanedChildrens));
}
}
- return list;
+ return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
+
}
- private List> findUpdates(final Result source, final Result target) {
+ private List generateDatasetRelatedEvents(final String relType,
+ final Collection>>> childrenWithRels) {
+
final List> list = new ArrayList<>();
- list.addAll(EnrichMissingAbstract.findUpdates(source, target));
- list.addAll(EnrichMissingAuthorOrcid.findUpdates(source, target));
- list.addAll(EnrichMissingOpenAccess.findUpdates(source, target));
- list.addAll(EnrichMissingPid.findUpdates(source, target));
- list.addAll(EnrichMissingProject.findUpdates(source, target));
- list.addAll(EnrichMissingPublicationDate.findUpdates(source, target));
- list.addAll(EnrichMissingSubject.findUpdates(source, target));
- list.addAll(EnrichMoreOpenAccess.findUpdates(source, target));
- list.addAll(EnrichMorePid.findUpdates(source, target));
- list.addAll(EnrichMoreSubject.findUpdates(source, target));
- return list;
+
+ final List>> cleanedChildrens = childrenWithRels
+ .stream()
+ .filter(p -> p.getRight().containsKey(relType))
+ .map(p -> Pair.of(p.getLeft(), p.getRight().get(relType)))
+ .filter(p -> p.getRight().size() > 0)
+ .collect(Collectors.toList());
+
+ for (final Pair> target : cleanedChildrens) {
+ if (relType.equals("isRelatedTo")) {
+ list.addAll(enrichMisissingDatasetIsRelatedTo.searchUpdatesForRecord(target, cleanedChildrens));
+ } else if (relType.equals("references")) {
+ list.addAll(enrichMissingDatasetReferences.searchUpdatesForRecord(target, cleanedChildrens));
+ } else if (relType.equals("isReferencedBy")) {
+ list.addAll(enrichMissingDatasetIsReferencedBy.searchUpdatesForRecord(target, cleanedChildrens));
+ } else if (relType.equals("isSupplementedTo")) {
+ list.addAll(enrichMissingDatasetIsSupplementedTo.searchUpdatesForRecord(target, cleanedChildrens));
+ } else if (relType.equals("isSupplementedBy")) {
+ list.addAll(enrichMissingDatasetIsSupplementedBy.searchUpdatesForRecord(target, cleanedChildrens));
+ }
+ }
+
+ return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
+
}
+ public static Dataset readPath(
+ final SparkSession spark,
+ final String inputPath,
+ final Class clazz) {
+ return spark
+ .read()
+ .textFile(inputPath)
+ .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
+ }
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java
new file mode 100644
index 000000000..5bfe108a5
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java
@@ -0,0 +1,68 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.schema.oaf.Field;
+
+public abstract class UpdateMatcher {
+
+ private final boolean multipleUpdate;
+
+ public UpdateMatcher(final boolean multipleUpdate) {
+ this.multipleUpdate = multipleUpdate;
+ }
+
+ public Collection> searchUpdatesForRecord(final K res, final Collection others) {
+
+ final Map> infoMap = new HashMap<>();
+
+ for (final K source : others) {
+ if (source != res) {
+ for (final UpdateInfo info : findUpdates(source, res)) {
+ final String s = DigestUtils.md5Hex(info.getHighlightValueAsString());
+ if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {
+ } else {
+ infoMap.put(s, info);
+ }
+ }
+ }
+ }
+
+ final Collection> values = infoMap.values();
+
+ if (values.isEmpty() || multipleUpdate) {
+ return values;
+ } else {
+ final UpdateInfo v = values
+ .stream()
+ .sorted((o1, o2) -> Float.compare(o1.getTrust(), o2.getTrust()))
+ .findFirst()
+ .get();
+ return Arrays.asList(v);
+ }
+ }
+
+ protected abstract List> findUpdates(K source, K target);
+
+ protected abstract UpdateInfo generateUpdateInfo(final T highlightValue,
+ final K source,
+ final K target);
+
+ protected static boolean isMissing(final List> list) {
+ return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0).getValue());
+ }
+
+ protected boolean isMissing(final Field field) {
+ return field == null || StringUtils.isBlank(field.getValue());
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java
new file mode 100644
index 000000000..321fd4318
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java
@@ -0,0 +1,63 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
+import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.schema.oaf.Dataset;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public abstract class AbstractEnrichMissingDataset
+ extends UpdateMatcher>, eu.dnetlib.broker.objects.Dataset> {
+
+ private final Topic topic;
+
+ public AbstractEnrichMissingDataset(final Topic topic) {
+ super(true);
+ this.topic = topic;
+ }
+
+ @Override
+ protected final List> findUpdates(
+ final Pair> source,
+ final Pair> target) {
+
+ final Set existingDatasets = target
+ .getRight()
+ .stream()
+ .map(Dataset::getId)
+ .collect(Collectors.toSet());
+
+ return source
+ .getRight()
+ .stream()
+ .filter(d -> !existingDatasets.contains(d.getId()))
+ .map(ConversionUtils::oafDatasetToBrokerDataset)
+ .map(i -> generateUpdateInfo(i, source, target))
+ .collect(Collectors.toList());
+
+ }
+
+ @Override
+ protected final UpdateInfo generateUpdateInfo(
+ final eu.dnetlib.broker.objects.Dataset highlightValue,
+ final Pair> source,
+ final Pair> target) {
+ return new UpdateInfo<>(
+ getTopic(),
+ highlightValue, source.getLeft(), target.getLeft(),
+ (p, rel) -> p.getDatasets().add(rel),
+ rel -> rel.getInstances().get(0).getUrl());
+ }
+
+ public Topic getTopic() {
+ return topic;
+ }
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsReferencedBy.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsReferencedBy.java
new file mode 100644
index 000000000..74ce761f4
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsReferencedBy.java
@@ -0,0 +1,12 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+
+public class EnrichMissingDatasetIsReferencedBy extends AbstractEnrichMissingDataset {
+
+ public EnrichMissingDatasetIsReferencedBy() {
+ super(Topic.ENRICH_MISSING_DATASET_IS_REFERENCED_BY);
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsRelatedTo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsRelatedTo.java
new file mode 100644
index 000000000..05a891059
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsRelatedTo.java
@@ -0,0 +1,12 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+
+public class EnrichMissingDatasetIsRelatedTo extends AbstractEnrichMissingDataset {
+
+ public EnrichMissingDatasetIsRelatedTo() {
+ super(Topic.ENRICH_MISSING_DATASET_IS_RELATED_TO);
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsSupplementedBy.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsSupplementedBy.java
new file mode 100644
index 000000000..23bd68fa1
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsSupplementedBy.java
@@ -0,0 +1,12 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+
+public class EnrichMissingDatasetIsSupplementedBy extends AbstractEnrichMissingDataset {
+
+ public EnrichMissingDatasetIsSupplementedBy() {
+ super(Topic.ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_BY);
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsSupplementedTo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsSupplementedTo.java
new file mode 100644
index 000000000..03160b6f0
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetIsSupplementedTo.java
@@ -0,0 +1,12 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+
+public class EnrichMissingDatasetIsSupplementedTo extends AbstractEnrichMissingDataset {
+
+ public EnrichMissingDatasetIsSupplementedTo() {
+ super(Topic.ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_TO);
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetReferences.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetReferences.java
new file mode 100644
index 000000000..bf1df053d
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/EnrichMissingDatasetReferences.java
@@ -0,0 +1,12 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+
+public class EnrichMissingDatasetReferences extends AbstractEnrichMissingDataset {
+
+ public EnrichMissingDatasetReferences() {
+ super(Topic.ENRICH_MISSING_DATASET_REFERENCES);
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java
new file mode 100644
index 000000000..461266d56
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java
@@ -0,0 +1,41 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.schema.oaf.Project;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMissingProject
+ extends UpdateMatcher>, eu.dnetlib.broker.objects.Project> {
+
+ public EnrichMissingProject() {
+ super(true);
+ }
+
+ @Override
+ protected List> findUpdates(final Pair> source,
+ final Pair> target) {
+ // TODO
+ return Arrays.asList();
+ }
+
+ @Override
+ public UpdateInfo generateUpdateInfo(
+ final eu.dnetlib.broker.objects.Project highlightValue,
+ final Pair> source,
+ final Pair> target) {
+ return new UpdateInfo<>(
+ Topic.ENRICH_MISSING_PROJECT,
+ highlightValue, source.getLeft(), target.getLeft(),
+ (p, prj) -> p.getProjects().add(prj),
+ prj -> prj.getFunder() + "::" + prj.getFundingProgram() + prj.getCode());
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java
new file mode 100644
index 000000000..d9bfb62d5
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java
@@ -0,0 +1,40 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.schema.oaf.Project;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMoreProject extends UpdateMatcher>, eu.dnetlib.broker.objects.Project> {
+
+ public EnrichMoreProject() {
+ super(true);
+ }
+
+ @Override
+ protected List> findUpdates(final Pair> source,
+ final Pair> target) {
+ // TODO
+ return Arrays.asList();
+ }
+
+ @Override
+ public UpdateInfo generateUpdateInfo(
+ final eu.dnetlib.broker.objects.Project highlightValue,
+ final Pair> source,
+ final Pair> target) {
+ return new UpdateInfo<>(
+ Topic.ENRICH_MORE_PROJECT,
+ highlightValue, source.getLeft(), target.getLeft(),
+ (p, prj) -> p.getProjects().add(prj),
+ prj -> prj.getFunder() + "::" + prj.getFundingProgram() + prj.getCode());
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java
new file mode 100644
index 000000000..75e77b3c6
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java
@@ -0,0 +1,63 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.relatedPublications;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
+import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.schema.oaf.Publication;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public abstract class AbstractEnrichMissingPublication
+ extends UpdateMatcher>, eu.dnetlib.broker.objects.Publication> {
+
+ private final Topic topic;
+
+ public AbstractEnrichMissingPublication(final Topic topic) {
+ super(true);
+ this.topic = topic;
+ }
+
+ @Override
+ protected final List> findUpdates(
+ final Pair> source,
+ final Pair> target) {
+
+ final Set existingPublications = target
+ .getRight()
+ .stream()
+ .map(Publication::getId)
+ .collect(Collectors.toSet());
+
+ return source
+ .getRight()
+ .stream()
+ .filter(d -> !existingPublications.contains(d.getId()))
+ .map(ConversionUtils::oafPublicationToBrokerPublication)
+ .map(i -> generateUpdateInfo(i, source, target))
+ .collect(Collectors.toList());
+
+ }
+
+ @Override
+ protected final UpdateInfo generateUpdateInfo(
+ final eu.dnetlib.broker.objects.Publication highlightValue,
+ final Pair> source,
+ final Pair> target) {
+ return new UpdateInfo<>(
+ getTopic(),
+ highlightValue, source.getLeft(), target.getLeft(),
+ (p, rel) -> p.getPublications().add(rel),
+ rel -> rel.getInstances().get(0).getUrl());
+ }
+
+ public Topic getTopic() {
+ return topic;
+ }
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsReferencedBy.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsReferencedBy.java
new file mode 100644
index 000000000..73fa8a45f
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsReferencedBy.java
@@ -0,0 +1,12 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.relatedPublications;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+
+public class EnrichMissingPublicationIsReferencedBy extends AbstractEnrichMissingPublication {
+
+ public EnrichMissingPublicationIsReferencedBy() {
+ super(Topic.ENRICH_MISSING_PUBLICATION_IS_REFERENCED_BY);
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsRelatedTo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsRelatedTo.java
new file mode 100644
index 000000000..361ea3b34
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsRelatedTo.java
@@ -0,0 +1,12 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.relatedPublications;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+
+public class EnrichMissingPublicationIsRelatedTo extends AbstractEnrichMissingPublication {
+
+ public EnrichMissingPublicationIsRelatedTo() {
+ super(Topic.ENRICH_MISSING_PUBLICATION_IS_RELATED_TO);
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsSupplementedBy.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsSupplementedBy.java
new file mode 100644
index 000000000..7e8863b1e
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsSupplementedBy.java
@@ -0,0 +1,12 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.relatedPublications;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+
+public class EnrichMissingPublicationIsSupplementedBy extends AbstractEnrichMissingPublication {
+
+ public EnrichMissingPublicationIsSupplementedBy() {
+ super(Topic.ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_BY);
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsSupplementedTo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsSupplementedTo.java
new file mode 100644
index 000000000..dc4e51377
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationIsSupplementedTo.java
@@ -0,0 +1,12 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.relatedPublications;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+
+public class EnrichMissingPublicationIsSupplementedTo extends AbstractEnrichMissingPublication {
+
+ public EnrichMissingPublicationIsSupplementedTo() {
+ super(Topic.ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_TO);
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationReferences.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationReferences.java
new file mode 100644
index 000000000..5198098bc
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/EnrichMissingPublicationReferences.java
@@ -0,0 +1,12 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.relatedPublications;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+
+public class EnrichMissingPublicationReferences extends AbstractEnrichMissingPublication {
+
+ public EnrichMissingPublicationReferences() {
+ super(Topic.ENRICH_MISSING_PUBLICATION_REFERENCES);
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java
new file mode 100644
index 000000000..a418b633e
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java
@@ -0,0 +1,38 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.simple;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMissingAbstract extends UpdateMatcher {
+
+ public EnrichMissingAbstract() {
+ super(false);
+ }
+
+ @Override
+ protected List> findUpdates(final Result source, final Result target) {
+ if (isMissing(target.getDescription()) && !isMissing(source.getDescription())) {
+ return Arrays.asList(generateUpdateInfo(source.getDescription().get(0).getValue(), source, target));
+ }
+ return new ArrayList<>();
+ }
+
+ @Override
+ public UpdateInfo generateUpdateInfo(final String highlightValue,
+ final Result source,
+ final Result target) {
+ return new UpdateInfo<>(
+ Topic.ENRICH_MISSING_ABSTRACT,
+ highlightValue, source, target,
+ (p, s) -> p.getAbstracts().add(s),
+ s -> s);
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java
new file mode 100644
index 000000000..b5c2f7e72
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java
@@ -0,0 +1,36 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.simple;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMissingAuthorOrcid extends UpdateMatcher> {
+
+ public EnrichMissingAuthorOrcid() {
+ super(true);
+ }
+
+ @Override
+ protected List>> findUpdates(final Result source, final Result target) {
+ // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
+ return Arrays.asList();
+ }
+
+ @Override
+ public UpdateInfo> generateUpdateInfo(final Pair highlightValue,
+ final Result source,
+ final Result target) {
+ return new UpdateInfo<>(
+ Topic.ENRICH_MISSING_AUTHOR_ORCID,
+ highlightValue, source, target,
+ (p, pair) -> p.getCreators().add(pair.getLeft() + " - ORCID: " + pair.getRight()),
+ pair -> pair.getLeft() + "::" + pair.getRight());
+ }
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java
new file mode 100644
index 000000000..c7e9dcbc1
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java
@@ -0,0 +1,56 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.simple;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import eu.dnetlib.broker.objects.Instance;
+import eu.dnetlib.dhp.broker.model.Topic;
+import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
+import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMissingOpenAccess extends UpdateMatcher {
+
+ public EnrichMissingOpenAccess() {
+ super(true);
+ }
+
+ @Override
+ protected List> findUpdates(final Result source, final Result target) {
+ final long count = target
+ .getInstance()
+ .stream()
+ .map(i -> i.getAccessright().getClassid())
+ .filter(right -> right.equals(BrokerConstants.OPEN_ACCESS))
+ .count();
+
+ if (count > 0) {
+ return Arrays.asList();
+ }
+
+ return source
+ .getInstance()
+ .stream()
+ .filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
+ .map(ConversionUtils::oafInstanceToBrokerInstances)
+ .flatMap(s -> s)
+ .map(i -> generateUpdateInfo(i, source, target))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public UpdateInfo generateUpdateInfo(final Instance highlightValue,
+ final Result source,
+ final Result target) {
+ return new UpdateInfo<>(
+ Topic.ENRICH_MISSING_OA_VERSION,
+ highlightValue, source, target,
+ (p, i) -> p.getInstances().add(i),
+ Instance::getUrl);
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java
new file mode 100644
index 000000000..522d46d40
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java
@@ -0,0 +1,46 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.simple;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import eu.dnetlib.broker.objects.Pid;
+import eu.dnetlib.dhp.broker.model.Topic;
+import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
+import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMissingPid extends UpdateMatcher {
+
+ public EnrichMissingPid() {
+ super(true);
+ }
+
+ @Override
+ protected List> findUpdates(final Result source, final Result target) {
+ final long count = target.getPid().size();
+
+ if (count > 0) {
+ return Arrays.asList();
+ }
+
+ return source
+ .getPid()
+ .stream()
+ .map(ConversionUtils::oafPidToBrokerPid)
+ .map(i -> generateUpdateInfo(i, source, target))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public UpdateInfo generateUpdateInfo(final Pid highlightValue, final Result source, final Result target) {
+ return new UpdateInfo<>(
+ Topic.ENRICH_MISSING_PID,
+ highlightValue, source, target,
+ (p, pid) -> p.getPids().add(pid),
+ pid -> pid.getType() + "::" + pid.getValue());
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java
new file mode 100644
index 000000000..197ace97c
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java
@@ -0,0 +1,38 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.simple;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMissingPublicationDate extends UpdateMatcher {
+
+ public EnrichMissingPublicationDate() {
+ super(false);
+ }
+
+ @Override
+ protected List> findUpdates(final Result source, final Result target) {
+ if (isMissing(target.getDateofacceptance()) && !isMissing(source.getDateofacceptance())) {
+ return Arrays.asList(generateUpdateInfo(source.getDateofacceptance().getValue(), source, target));
+ }
+ return new ArrayList<>();
+ }
+
+ @Override
+ public UpdateInfo generateUpdateInfo(final String highlightValue,
+ final Result source,
+ final Result target) {
+ return new UpdateInfo<>(
+ Topic.ENRICH_MISSING_PUBLICATION_DATE,
+ highlightValue, source, target,
+ (p, date) -> p.setPublicationdate(date),
+ s -> s);
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSoftware.java
new file mode 100644
index 000000000..4fcba43a4
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSoftware.java
@@ -0,0 +1,42 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.simple;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.schema.oaf.Result;
+import eu.dnetlib.dhp.schema.oaf.Software;
+
+public class EnrichMissingSoftware
+ extends UpdateMatcher>, eu.dnetlib.broker.objects.Software> {
+
+ public EnrichMissingSoftware() {
+ super(true);
+ }
+
+ @Override
+ protected List> findUpdates(
+ final Pair> source,
+ final Pair> target) {
+ // TODO
+ return Arrays.asList();
+ }
+
+ @Override
+ public UpdateInfo generateUpdateInfo(
+ final eu.dnetlib.broker.objects.Software highlightValue,
+ final Pair> source,
+ final Pair> target) {
+ return new UpdateInfo<>(
+ Topic.ENRICH_MISSING_SOFTWARE,
+ highlightValue, source.getLeft(), target.getLeft(),
+ (p, s) -> p.getSoftwares().add(s),
+ s -> s.getName());
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java
new file mode 100644
index 000000000..290bad48b
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java
@@ -0,0 +1,54 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.simple;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
+import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.schema.oaf.Qualifier;
+import eu.dnetlib.dhp.schema.oaf.Result;
+import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
+
+public class EnrichMissingSubject extends UpdateMatcher> {
+
+ public EnrichMissingSubject() {
+ super(true);
+ }
+
+ @Override
+ protected List>> findUpdates(final Result source, final Result target) {
+ final Set existingTypes = target
+ .getSubject()
+ .stream()
+ .map(StructuredProperty::getQualifier)
+ .map(Qualifier::getClassid)
+ .collect(Collectors.toSet());
+
+ return source
+ .getPid()
+ .stream()
+ .filter(pid -> !existingTypes.contains(pid.getQualifier().getClassid()))
+ .map(ConversionUtils::oafSubjectToPair)
+ .map(i -> generateUpdateInfo(i, source, target))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public UpdateInfo> generateUpdateInfo(final Pair highlightValue,
+ final Result source,
+ final Result target) {
+
+ return new UpdateInfo<>(
+ Topic.fromPath("ENRICH/MISSING/SUBJECT/" + highlightValue.getLeft()),
+ highlightValue, source, target,
+ (p, pair) -> p.getSubjects().add(pair.getRight()),
+ pair -> pair.getLeft() + "::" + pair.getRight());
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java
new file mode 100644
index 000000000..c376da44d
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java
@@ -0,0 +1,54 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.simple;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import eu.dnetlib.broker.objects.Instance;
+import eu.dnetlib.dhp.broker.model.Topic;
+import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
+import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMoreOpenAccess extends UpdateMatcher {
+
+ public EnrichMoreOpenAccess() {
+ super(true);
+ }
+
+ @Override
+ protected List> findUpdates(final Result source, final Result target) {
+ final Set urls = target
+ .getInstance()
+ .stream()
+ .filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
+ .map(i -> i.getUrl())
+ .flatMap(List::stream)
+ .collect(Collectors.toSet());
+
+ return source
+ .getInstance()
+ .stream()
+ .filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
+ .map(ConversionUtils::oafInstanceToBrokerInstances)
+ .flatMap(s -> s)
+ .filter(i -> !urls.contains(i.getUrl()))
+ .map(i -> generateUpdateInfo(i, source, target))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public UpdateInfo generateUpdateInfo(final Instance highlightValue,
+ final Result source,
+ final Result target) {
+ return new UpdateInfo<>(
+ Topic.ENRICH_MORE_OA_VERSION,
+ highlightValue, source, target,
+ (p, i) -> p.getInstances().add(i),
+ Instance::getUrl);
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java
new file mode 100644
index 000000000..2ee327c83
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java
@@ -0,0 +1,47 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.simple;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import eu.dnetlib.broker.objects.Pid;
+import eu.dnetlib.dhp.broker.model.Topic;
+import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
+import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMorePid extends UpdateMatcher {
+
+ public EnrichMorePid() {
+ super(true);
+ }
+
+ @Override
+ protected List> findUpdates(final Result source, final Result target) {
+ final Set existingPids = target
+ .getPid()
+ .stream()
+ .map(pid -> pid.getQualifier().getClassid() + "::" + pid.getValue())
+ .collect(Collectors.toSet());
+
+ return source
+ .getPid()
+ .stream()
+ .filter(pid -> !existingPids.contains(pid.getQualifier().getClassid() + "::" + pid.getValue()))
+ .map(ConversionUtils::oafPidToBrokerPid)
+ .map(i -> generateUpdateInfo(i, source, target))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public UpdateInfo generateUpdateInfo(final Pid highlightValue, final Result source, final Result target) {
+ return new UpdateInfo<>(
+ Topic.ENRICH_MORE_PID,
+ highlightValue, source, target,
+ (p, pid) -> p.getPids().add(pid),
+ pid -> pid.getType() + "::" + pid.getValue());
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSoftware.java
new file mode 100644
index 000000000..a1affff62
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSoftware.java
@@ -0,0 +1,42 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.simple;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.schema.oaf.Result;
+import eu.dnetlib.dhp.schema.oaf.Software;
+
+public class EnrichMoreSoftware
+ extends UpdateMatcher>, eu.dnetlib.broker.objects.Software> {
+
+ public EnrichMoreSoftware() {
+ super(true);
+ }
+
+ @Override
+ protected List> findUpdates(
+ final Pair> source,
+ final Pair> target) {
+ // TODO
+ return Arrays.asList();
+ }
+
+ @Override
+ public UpdateInfo generateUpdateInfo(
+ final eu.dnetlib.broker.objects.Software highlightValue,
+ final Pair> source,
+ final Pair> target) {
+ return new UpdateInfo<>(
+ Topic.ENRICH_MORE_SOFTWARE,
+ highlightValue, source.getLeft(), target.getLeft(),
+ (p, s) -> p.getSoftwares().add(s),
+ s -> s.getName());
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java
new file mode 100644
index 000000000..b38445e88
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java
@@ -0,0 +1,51 @@
+
+package eu.dnetlib.dhp.broker.oa.matchers.simple;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import eu.dnetlib.dhp.broker.model.Topic;
+import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
+import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+import eu.dnetlib.dhp.schema.oaf.Result;
+
+public class EnrichMoreSubject extends UpdateMatcher> {
+
+ public EnrichMoreSubject() {
+ super(true);
+ }
+
+ @Override
+ protected List>> findUpdates(final Result source, final Result target) {
+ final Set existingSubjects = target
+ .getSubject()
+ .stream()
+ .map(pid -> pid.getQualifier().getClassid() + "::" + pid.getValue())
+ .collect(Collectors.toSet());
+
+ return source
+ .getPid()
+ .stream()
+ .filter(pid -> !existingSubjects.contains(pid.getQualifier().getClassid() + "::" + pid.getValue()))
+ .map(ConversionUtils::oafSubjectToPair)
+ .map(i -> generateUpdateInfo(i, source, target))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public UpdateInfo> generateUpdateInfo(final Pair highlightValue,
+ final Result source,
+ final Result target) {
+
+ return new UpdateInfo<>(
+ Topic.fromPath("ENRICH/MORE/SUBJECT/" + highlightValue.getLeft()),
+ highlightValue, source, target,
+ (p, pair) -> p.getSubjects().add(pair.getRight()),
+ pair -> pair.getLeft() + "::" + pair.getRight());
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java
new file mode 100644
index 000000000..8e97192bf
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java
@@ -0,0 +1,9 @@
+
+package eu.dnetlib.dhp.broker.oa.util;
+
+public class BrokerConstants {
+
+ public final static String OPEN_ACCESS = "OPEN";
+ public final static String IS_MERGED_IN_CLASS = "isMergedIn";
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java
new file mode 100644
index 000000000..2f87d0ee7
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java
@@ -0,0 +1,49 @@
+
+package eu.dnetlib.dhp.broker.oa.util;
+
+import java.util.stream.Stream;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import eu.dnetlib.broker.objects.Instance;
+import eu.dnetlib.broker.objects.Pid;
+import eu.dnetlib.dhp.schema.oaf.Dataset;
+import eu.dnetlib.dhp.schema.oaf.Publication;
+import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
+
+public class ConversionUtils {
+
+ public static Stream oafInstanceToBrokerInstances(final eu.dnetlib.dhp.schema.oaf.Instance i) {
+ return i.getUrl().stream().map(url -> {
+ final Instance r = new Instance();
+ r.setUrl(url);
+ r.setInstancetype(i.getInstancetype().getClassid());
+ r.setLicense(BrokerConstants.OPEN_ACCESS);
+ r.setHostedby(i.getHostedby().getValue());
+ return r;
+ });
+ }
+
+ public static Pid oafPidToBrokerPid(final StructuredProperty sp) {
+ final Pid pid = new Pid();
+ pid.setValue(sp.getValue());
+ pid.setType(sp.getQualifier().getClassid());
+ return pid;
+ }
+
+ public static final Pair oafSubjectToPair(final StructuredProperty sp) {
+ return Pair.of(sp.getQualifier().getClassid(), sp.getValue());
+ }
+
+ public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) {
+ final eu.dnetlib.broker.objects.Dataset res = new eu.dnetlib.broker.objects.Dataset();
+ // TODO
+ return res;
+ }
+
+ public static final eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication d) {
+ final eu.dnetlib.broker.objects.Publication res = new eu.dnetlib.broker.objects.Publication();
+ // TODO
+ return res;
+ }
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAbstract.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAbstract.java
deleted file mode 100644
index 493d1f97c..000000000
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAbstract.java
+++ /dev/null
@@ -1,31 +0,0 @@
-
-package eu.dnetlib.dhp.broker.oa.util;
-
-import java.util.Arrays;
-import java.util.List;
-
-import eu.dnetlib.broker.objects.OpenAireEventPayload;
-import eu.dnetlib.dhp.schema.oaf.Result;
-
-public class EnrichMissingAbstract extends UpdateInfo {
-
- public static List findUpdates(final Result source, final Result target) {
- // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
- return Arrays.asList();
- }
-
- private EnrichMissingAbstract(final String highlightValue, final float trust) {
- super("ENRICH/MISSING/ABSTRACT", highlightValue, trust);
- }
-
- @Override
- public void compileHighlight(final OpenAireEventPayload payload) {
- payload.getHighlight().getAbstracts().add(getHighlightValue());
- }
-
- @Override
- public String getHighlightValueAsString() {
- return getHighlightValue();
- }
-
-}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAuthorOrcid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAuthorOrcid.java
deleted file mode 100644
index 6899c62a3..000000000
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAuthorOrcid.java
+++ /dev/null
@@ -1,31 +0,0 @@
-
-package eu.dnetlib.dhp.broker.oa.util;
-
-import java.util.Arrays;
-import java.util.List;
-
-import eu.dnetlib.broker.objects.OpenAireEventPayload;
-import eu.dnetlib.dhp.schema.oaf.Result;
-
-public class EnrichMissingAuthorOrcid extends UpdateInfo {
-
- public static List findUpdates(final Result source, final Result target) {
- // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
- return Arrays.asList();
- }
-
- private EnrichMissingAuthorOrcid(final String highlightValue, final float trust) {
- super("ENRICH/MISSING/AUTHOR/ORCID", highlightValue, trust);
- }
-
- @Override
- public void compileHighlight(final OpenAireEventPayload payload) {
- // TODO
- }
-
- @Override
- public String getHighlightValueAsString() {
- return getHighlightValue();
- }
-
-}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingOpenAccess.java
deleted file mode 100644
index 9464130f3..000000000
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingOpenAccess.java
+++ /dev/null
@@ -1,32 +0,0 @@
-
-package eu.dnetlib.dhp.broker.oa.util;
-
-import java.util.Arrays;
-import java.util.List;
-
-import eu.dnetlib.broker.objects.Instance;
-import eu.dnetlib.broker.objects.OpenAireEventPayload;
-import eu.dnetlib.dhp.schema.oaf.Result;
-
-public class EnrichMissingOpenAccess extends UpdateInfo {
-
- public static List findUpdates(final Result source, final Result target) {
- // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
- return Arrays.asList();
- }
-
- private EnrichMissingOpenAccess(final Instance highlightValue, final float trust) {
- super("ENRICH/MISSING/OPENACCESS_VERSION", highlightValue, trust);
- }
-
- @Override
- public void compileHighlight(final OpenAireEventPayload payload) {
- payload.getHighlight().getInstances().add(getHighlightValue());
- }
-
- @Override
- public String getHighlightValueAsString() {
- return getHighlightValue().getUrl();
- }
-
-}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPid.java
deleted file mode 100644
index 293d4993f..000000000
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPid.java
+++ /dev/null
@@ -1,32 +0,0 @@
-
-package eu.dnetlib.dhp.broker.oa.util;
-
-import java.util.Arrays;
-import java.util.List;
-
-import eu.dnetlib.broker.objects.OpenAireEventPayload;
-import eu.dnetlib.broker.objects.Pid;
-import eu.dnetlib.dhp.schema.oaf.Result;
-
-public class EnrichMissingPid extends UpdateInfo {
-
- public static List findUpdates(final Result source, final Result target) {
- // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
- return Arrays.asList();
- }
-
- private EnrichMissingPid(final Pid highlightValue, final float trust) {
- super("ENRICH/MISSING/PID", highlightValue, trust);
- }
-
- @Override
- public void compileHighlight(final OpenAireEventPayload payload) {
- payload.getHighlight().getPids().add(getHighlightValue());
- }
-
- @Override
- public String getHighlightValueAsString() {
- return getHighlightValue().getType() + "::" + getHighlightValue().getValue();
- }
-
-}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingProject.java
deleted file mode 100644
index a22c179a2..000000000
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingProject.java
+++ /dev/null
@@ -1,33 +0,0 @@
-
-package eu.dnetlib.dhp.broker.oa.util;
-
-import java.util.Arrays;
-import java.util.List;
-
-import eu.dnetlib.broker.objects.OpenAireEventPayload;
-import eu.dnetlib.broker.objects.Project;
-import eu.dnetlib.dhp.schema.oaf.Result;
-
-public class EnrichMissingProject extends UpdateInfo {
-
- public static List findUpdates(final Result source, final Result target) {
- // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
- return Arrays.asList();
- }
-
- private EnrichMissingProject(final Project highlightValue, final float trust) {
- super("ENRICH/MISSING/PROJECT", highlightValue, trust);
- }
-
- @Override
- public void compileHighlight(final OpenAireEventPayload payload) {
- payload.getHighlight().getProjects().add(getHighlightValue());
- }
-
- @Override
- public String getHighlightValueAsString() {
- return getHighlightValue().getFunder() + "::" + getHighlightValue().getFundingProgram()
- + getHighlightValue().getCode();
- }
-
-}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPublicationDate.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPublicationDate.java
deleted file mode 100644
index 869dca264..000000000
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPublicationDate.java
+++ /dev/null
@@ -1,31 +0,0 @@
-
-package eu.dnetlib.dhp.broker.oa.util;
-
-import java.util.Arrays;
-import java.util.List;
-
-import eu.dnetlib.broker.objects.OpenAireEventPayload;
-import eu.dnetlib.dhp.schema.oaf.Result;
-
-public class EnrichMissingPublicationDate extends UpdateInfo {
-
- public static List findUpdates(final Result source, final Result target) {
- // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
- return Arrays.asList();
- }
-
- private EnrichMissingPublicationDate(final String highlightValue, final float trust) {
- super("ENRICH/MISSING/PUBLICATION_DATE", highlightValue, trust);
- }
-
- @Override
- public void compileHighlight(final OpenAireEventPayload payload) {
- payload.getHighlight().setPublicationdate(getHighlightValue());
- }
-
- @Override
- public String getHighlightValueAsString() {
- return getHighlightValue();
- }
-
-}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingSubject.java
deleted file mode 100644
index a2ed5d043..000000000
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingSubject.java
+++ /dev/null
@@ -1,36 +0,0 @@
-
-package eu.dnetlib.dhp.broker.oa.util;
-
-import java.util.Arrays;
-import java.util.List;
-
-import eu.dnetlib.broker.objects.OpenAireEventPayload;
-import eu.dnetlib.dhp.schema.oaf.Result;
-
-public class EnrichMissingSubject extends UpdateInfo {
-
- public static List findUpdates(final Result source, final Result target) {
- // MESHEUROPMC
- // ARXIV
- // JEL
- // DDC
- // ACM
-
- return Arrays.asList();
- }
-
- private EnrichMissingSubject(final String subjectClassification, final String highlightValue, final float trust) {
- super("ENRICH/MISSING/SUBJECT/" + subjectClassification, highlightValue, trust);
- }
-
- @Override
- public void compileHighlight(final OpenAireEventPayload payload) {
- payload.getHighlight().getSubjects().add(getHighlightValue());
- }
-
- @Override
- public String getHighlightValueAsString() {
- return getHighlightValue();
- }
-
-}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreOpenAccess.java
deleted file mode 100644
index 4f1e88d3d..000000000
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreOpenAccess.java
+++ /dev/null
@@ -1,32 +0,0 @@
-
-package eu.dnetlib.dhp.broker.oa.util;
-
-import java.util.Arrays;
-import java.util.List;
-
-import eu.dnetlib.broker.objects.Instance;
-import eu.dnetlib.broker.objects.OpenAireEventPayload;
-import eu.dnetlib.dhp.schema.oaf.Result;
-
-public class EnrichMoreOpenAccess extends UpdateInfo {
-
- public static List findUpdates(final Result source, final Result target) {
- // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
- return Arrays.asList();
- }
-
- private EnrichMoreOpenAccess(final Instance highlightValue, final float trust) {
- super("ENRICH/MORE/OPENACCESS_VERSION", highlightValue, trust);
- }
-
- @Override
- public void compileHighlight(final OpenAireEventPayload payload) {
- payload.getHighlight().getInstances().add(getHighlightValue());
- }
-
- @Override
- public String getHighlightValueAsString() {
- return getHighlightValue().getUrl();
- }
-
-}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMorePid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMorePid.java
deleted file mode 100644
index ecf2cf310..000000000
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMorePid.java
+++ /dev/null
@@ -1,32 +0,0 @@
-
-package eu.dnetlib.dhp.broker.oa.util;
-
-import java.util.Arrays;
-import java.util.List;
-
-import eu.dnetlib.broker.objects.OpenAireEventPayload;
-import eu.dnetlib.broker.objects.Pid;
-import eu.dnetlib.dhp.schema.oaf.Result;
-
-public class EnrichMorePid extends UpdateInfo {
-
- public static List findUpdates(final Result source, final Result target) {
- // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
- return Arrays.asList();
- }
-
- private EnrichMorePid(final Pid highlightValue, final float trust) {
- super("ENRICH/MORE/PID", highlightValue, trust);
- }
-
- @Override
- public void compileHighlight(final OpenAireEventPayload payload) {
- payload.getHighlight().getPids().add(getHighlightValue());
- }
-
- @Override
- public String getHighlightValueAsString() {
- return getHighlightValue().getType() + "::" + getHighlightValue().getValue();
- }
-
-}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreSubject.java
deleted file mode 100644
index f29b86292..000000000
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreSubject.java
+++ /dev/null
@@ -1,36 +0,0 @@
-
-package eu.dnetlib.dhp.broker.oa.util;
-
-import java.util.Arrays;
-import java.util.List;
-
-import eu.dnetlib.broker.objects.OpenAireEventPayload;
-import eu.dnetlib.dhp.schema.oaf.Result;
-
-public class EnrichMoreSubject extends UpdateInfo {
-
- public static List findUpdates(final Result source, final Result target) {
- // MESHEUROPMC
- // ARXIV
- // JEL
- // DDC
- // ACM
-
- return Arrays.asList();
- }
-
- private EnrichMoreSubject(final String subjectClassification, final String highlightValue, final float trust) {
- super("ENRICH/MORE/SUBJECT/" + subjectClassification, highlightValue, trust);
- }
-
- @Override
- public void compileHighlight(final OpenAireEventPayload payload) {
- payload.getHighlight().getSubjects().add(getHighlightValue());
- }
-
- @Override
- public String getHighlightValueAsString() {
- return getHighlightValue();
- }
-
-}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
index f7b6b69e9..5cc0d371d 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
@@ -1,36 +1,77 @@
package eu.dnetlib.dhp.broker.oa.util;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
import eu.dnetlib.broker.objects.OpenAireEventPayload;
+import eu.dnetlib.broker.objects.Publication;
+import eu.dnetlib.dhp.broker.model.Topic;
+import eu.dnetlib.dhp.schema.oaf.Result;
-public abstract class UpdateInfo {
+public final class UpdateInfo {
- private final String topic;
+ private final Topic topic;
private final T highlightValue;
+ private final Result source;
+
+ private final Result target;
+
+ private final BiConsumer compileHighlight;
+
+ private final Function highlightToString;
+
private final float trust;
- protected UpdateInfo(final String topic, final T highlightValue, final float trust) {
+ public UpdateInfo(final Topic topic, final T highlightValue, final Result source, final Result target,
+ final BiConsumer compileHighlight,
+ final Function highlightToString) {
this.topic = topic;
this.highlightValue = highlightValue;
- this.trust = trust;
+ this.source = source;
+ this.target = target;
+ this.compileHighlight = compileHighlight;
+ this.highlightToString = highlightToString;
+ this.trust = calculateTrust(source, target);
}
public T getHighlightValue() {
return highlightValue;
}
+ public Result getSource() {
+ return source;
+ }
+
+ public Result getTarget() {
+ return target;
+ }
+
+ private float calculateTrust(final Result source, final Result target) {
+ // TODO
+ return 0.9f;
+ }
+
+ protected Topic getTopic() {
+ return topic;
+ }
+
+ public String getTopicPath() {
+ return topic.getPath();
+ }
+
public float getTrust() {
return trust;
}
- public String getTopic() {
- return topic;
+ public void compileHighlight(final OpenAireEventPayload payload) {
+ compileHighlight.accept(payload.getHighlight(), getHighlightValue());
}
- abstract public void compileHighlight(OpenAireEventPayload payload);
-
- abstract public String getHighlightValueAsString();
+ public String getHighlightValueAsString() {
+ return highlightToString.apply(getHighlightValue());
+ }
}
diff --git a/dhp-workflows/dhp-dedup-openaire/pom.xml b/dhp-workflows/dhp-dedup-openaire/pom.xml
index fcc356ac0..44cf9e67c 100644
--- a/dhp-workflows/dhp-dedup-openaire/pom.xml
+++ b/dhp-workflows/dhp-dedup-openaire/pom.xml
@@ -3,7 +3,7 @@
dhp-workflows
eu.dnetlib.dhp
- 1.2.1-SNAPSHOT
+ 1.2.2-SNAPSHOT
4.0.0
dhp-dedup-openaire
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AuthorMerger.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AuthorMerger.java
new file mode 100644
index 000000000..ee5fd5165
--- /dev/null
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AuthorMerger.java
@@ -0,0 +1,170 @@
+
+package eu.dnetlib.dhp.oa.dedup;
+
+import java.text.Normalizer;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.wcohen.ss.JaroWinkler;
+
+import eu.dnetlib.dhp.schema.oaf.Author;
+import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
+import eu.dnetlib.pace.model.Person;
+import scala.Tuple2;
+
+public class AuthorMerger {
+
+ private static final Double THRESHOLD = 0.95;
+
+ public static List merge(List> authors) {
+
+ authors.sort((o1, o2) -> -Integer.compare(countAuthorsPids(o1), countAuthorsPids(o2)));
+
+ List author = new ArrayList<>();
+
+ for (List a : authors) {
+ author = mergeAuthor(author, a);
+ }
+
+ return author;
+
+ }
+
+ public static List mergeAuthor(final List a, final List b) {
+ int pa = countAuthorsPids(a);
+ int pb = countAuthorsPids(b);
+ List base, enrich;
+ int sa = authorsSize(a);
+ int sb = authorsSize(b);
+
+ if (pa == pb) {
+ base = sa > sb ? a : b;
+ enrich = sa > sb ? b : a;
+ } else {
+ base = pa > pb ? a : b;
+ enrich = pa > pb ? b : a;
+ }
+ enrichPidFromList(base, enrich);
+ return base;
+ }
+
+ private static void enrichPidFromList(List base, List enrich) {
+ if (base == null || enrich == null)
+ return;
+ final Map basePidAuthorMap = base
+ .stream()
+ .filter(a -> a.getPid() != null && a.getPid().size() > 0)
+ .flatMap(
+ a -> a
+ .getPid()
+ .stream()
+ .map(p -> new Tuple2<>(pidToComparableString(p), a)))
+ .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1));
+
+ final List> pidToEnrich = enrich
+ .stream()
+ .filter(a -> a.getPid() != null && a.getPid().size() > 0)
+ .flatMap(
+ a -> a
+ .getPid()
+ .stream()
+ .filter(p -> !basePidAuthorMap.containsKey(pidToComparableString(p)))
+ .map(p -> new Tuple2<>(p, a)))
+ .collect(Collectors.toList());
+
+ pidToEnrich
+ .forEach(
+ a -> {
+ Optional> simAuthor = base
+ .stream()
+ .map(ba -> new Tuple2<>(sim(ba, a._2()), ba))
+ .max(Comparator.comparing(Tuple2::_1));
+
+ if (simAuthor.isPresent()) {
+ double th = THRESHOLD;
+ // increase the threshold if the surname is too short
+ if (simAuthor.get()._2().getSurname() != null
+ && simAuthor.get()._2().getSurname().length() <= 3)
+ th = 0.99;
+
+ if (simAuthor.get()._1() > th) {
+ Author r = simAuthor.get()._2();
+ if (r.getPid() == null) {
+ r.setPid(new ArrayList<>());
+ }
+ r.getPid().add(a._1());
+ }
+ }
+ });
+ }
+
+ public static String pidToComparableString(StructuredProperty pid) {
+ return (pid.getQualifier() != null
+ ? pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase() : ""
+ : "")
+ + (pid.getValue() != null ? pid.getValue().toLowerCase() : "");
+ }
+
+ public static int countAuthorsPids(List authors) {
+ if (authors == null)
+ return 0;
+
+ return (int) authors.stream().filter(AuthorMerger::hasPid).count();
+ }
+
+ private static int authorsSize(List authors) {
+ if (authors == null)
+ return 0;
+ return authors.size();
+ }
+
+ private static Double sim(Author a, Author b) {
+
+ final Person pa = parse(a);
+ final Person pb = parse(b);
+
+ // if both are accurate (e.g. they have name and surname)
+ if (pa.isAccurate() & pb.isAccurate()) {
+ return new JaroWinkler().score(normalize(pa.getSurnameString()), normalize(pb.getSurnameString())) * 0.5
+ + new JaroWinkler().score(normalize(pa.getNameString()), normalize(pb.getNameString())) * 0.5;
+ } else {
+ return new JaroWinkler()
+ .score(normalize(pa.getNormalisedFullname()), normalize(pb.getNormalisedFullname()));
+ }
+ }
+
+ private static boolean hasPid(Author a) {
+ if (a == null || a.getPid() == null || a.getPid().size() == 0)
+ return false;
+ return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue()));
+ }
+
+ private static Person parse(Author author) {
+ if (StringUtils.isNotBlank(author.getSurname())) {
+ return new Person(author.getSurname() + ", " + author.getName(), false);
+ } else {
+ return new Person(author.getFullname(), false);
+ }
+ }
+
+ private static String normalize(final String s) {
+ return nfd(s)
+ .toLowerCase()
+ // do not compact the regexes in a single expression, would cause StackOverflowError
+ // in case
+ // of large input strings
+ .replaceAll("(\\W)+", " ")
+ .replaceAll("(\\p{InCombiningDiacriticalMarks})+", " ")
+ .replaceAll("(\\p{Punct})+", " ")
+ .replaceAll("(\\d)+", " ")
+ .replaceAll("(\\n)+", " ")
+ .trim();
+ }
+
+ private static String nfd(final String s) {
+ return Normalizer.normalize(s, Normalizer.Form.NFD);
+ }
+
+}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java
index fa06424d7..8028d5a94 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java
@@ -1,8 +1,10 @@
package eu.dnetlib.dhp.oa.dedup;
+import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
@@ -67,16 +69,19 @@ public class DedupRecordFactory {
(MapFunction, String>) entity -> entity._1(), Encoders.STRING())
.mapGroups(
(MapGroupsFunction, T>) (key,
- values) -> entityMerger(key, values, ts, dataInfo),
+ values) -> entityMerger(key, values, ts, dataInfo, clazz),
Encoders.bean(clazz));
}
- private static T entityMerger(
- String id, Iterator> entities, long ts, DataInfo dataInfo) {
+ public static T entityMerger(
+ String id, Iterator> entities, long ts, DataInfo dataInfo, Class clazz)
+ throws IllegalAccessException, InstantiationException {
- T entity = entities.next()._2();
+ T entity = clazz.newInstance();
final Collection dates = Lists.newArrayList();
+ final List> authors = Lists.newArrayList();
+
entities
.forEachRemaining(
t -> {
@@ -84,17 +89,17 @@ public class DedupRecordFactory {
entity.mergeFrom(duplicate);
if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result r1 = (Result) duplicate;
- Result er = (Result) entity;
- er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor()));
-
- if (r1.getDateofacceptance() != null) {
+ if (r1.getAuthor() != null && r1.getAuthor().size() > 0)
+ authors.add(r1.getAuthor());
+ if (r1.getDateofacceptance() != null)
dates.add(r1.getDateofacceptance().getValue());
- }
}
});
+ // set authors and date
if (ModelSupport.isSubClass(entity, Result.class)) {
((Result) entity).setDateofacceptance(DatePicker.pick(dates));
+ ((Result) entity).setAuthor(AuthorMerger.merge(authors));
}
entity.setId(id);
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java
index d3ae8ee4f..222794d64 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java
@@ -32,7 +32,6 @@ import eu.dnetlib.pace.model.Person;
import scala.Tuple2;
public class DedupUtility {
- private static final Double THRESHOLD = 0.95;
public static Map constructAccumulator(
final DedupConfig dedupConf, final SparkContext context) {
@@ -82,58 +81,6 @@ public class DedupUtility {
}
}
- public static List mergeAuthor(final List a, final List b) {
- int pa = countAuthorsPids(a);
- int pb = countAuthorsPids(b);
- List base, enrich;
- int sa = authorsSize(a);
- int sb = authorsSize(b);
-
- if (pa == pb) {
- base = sa > sb ? a : b;
- enrich = sa > sb ? b : a;
- } else {
- base = pa > pb ? a : b;
- enrich = pa > pb ? b : a;
- }
- enrichPidFromList(base, enrich);
- return base;
- }
-
- private static void enrichPidFromList(List