diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml
index f060a60bc..eddc042c6 100644
--- a/dhp-workflows/dhp-broker-events/pom.xml
+++ b/dhp-workflows/dhp-broker-events/pom.xml
@@ -61,7 +61,7 @@
eu.dnetlib
dnet-openaire-broker-common
- [2.0.1,3.0.0)
+ [3.0.1,4.0.0)
diff --git a/dhp-workflows/dhp-broker-events/report.xml b/dhp-workflows/dhp-broker-events/report.xml
new file mode 100644
index 000000000..6e706f723
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/report.xml
@@ -0,0 +1,37 @@
+
+
+
+ Feature Extraction
+
+
+ TCPFLOW
+ 1.5.0
+
+ 4.2.1 (4.2.1 Compatible Apple LLVM 11.0.0 (clang-1100.0.33.8))
+ -D_THREAD_SAFE -pthread -I/usr/local/include -I/usr/local/include -DUTC_OFFSET=+0000
+ -g -D_THREAD_SAFE -pthread -g -O3 -MD -Wpointer-arith -Wmissing-declarations -Wmissing-prototypes -Wshadow -Wwrite-strings -Wcast-align -Waggregate-return -Wbad-function-cast -Wcast-qual -Wundef -Wredundant-decls -Wdisabled-optimization -Wfloat-equal -Wmultichar -Wc++-compat -Wmissing-noreturn -Wall -Wstrict-prototypes -MD -D_FORTIFY_SOURCE=2 -Wpointer-arith -Wmissing-declarations -Wmissing-prototypes -Wshadow -Wwrite-strings -Wcast-align -Waggregate-return -Wbad-function-cast -Wcast-qual -Wundef -Wredundant-decls -Wdisabled-optimization -Wfloat-equal -Wmultichar -Wc++-compat -Wmissing-noreturn -Wall -Wstrict-prototypes
+ -g -D_THREAD_SAFE -pthread -g -O3 -Wall -MD -D_FORTIFY_SOURCE=2 -Wpointer-arith -Wshadow -Wwrite-strings -Wcast-align -Wredundant-decls -Wdisabled-optimization -Wfloat-equal -Wmultichar -Wmissing-noreturn -Woverloaded-virtual -Wsign-promo -funit-at-a-time -Weffc++ -std=c++11 -Wall -MD -D_FORTIFY_SOURCE=2 -Wpointer-arith -Wshadow -Wwrite-strings -Wcast-align -Wredundant-decls -Wdisabled-optimization -Wfloat-equal -Wmultichar -Wmissing-noreturn -Woverloaded-virtual -Wsign-promo -funit-at-a-time -Weffc++
+ -L/usr/local/lib -L/usr/local/lib
+ -lpython2.7 -lpython2.7 -lpcap -lbz2 -lexpat -lsqlite3 -lcrypto -lssl -lcrypto -ldl -lz
+ 2019-10-11T01:16:58
+
+
+
+
+ Darwin
+ 19.5.0
+ Darwin Kernel Version 19.5.0: Tue May 26 20:41:44 PDT 2020; root:xnu-6153.121.2~2/RELEASE_X86_64
+ Micheles-MBP.local
+ x86_64
+ tcpflow
+ 501
+ michele
+ 2020-06-15T14:55:03Z
+
+
+
+
+ 0
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java
index 0512a3813..f94d286e4 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Event.java
@@ -1,9 +1,15 @@
package eu.dnetlib.dhp.broker.model;
+import java.io.Serializable;
import java.util.Map;
-public class Event {
+public class Event implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -5936790326505781395L;
private String eventId;
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
index 9146cf422..bf4f62d24 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
@@ -6,17 +6,13 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
-import eu.dnetlib.dhp.schema.oaf.Author;
-import eu.dnetlib.dhp.schema.oaf.KeyValue;
-import eu.dnetlib.dhp.schema.oaf.Result;
-import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class EventFactory {
@@ -37,8 +33,7 @@ public class EventFactory {
final Map map = createMapFromResult(updateInfo);
final String eventId = calculateEventId(
- updateInfo.getTopicPath(), updateInfo.getTarget().getResult().getOriginalId().get(0),
- updateInfo.getHighlightValueAsString());
+ updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId(), updateInfo.getHighlightValueAsString());
res.setEventId(eventId);
res.setProducerId(PRODUCER_ID);
@@ -54,53 +49,31 @@ public class EventFactory {
private static Map createMapFromResult(final UpdateInfo> updateInfo) {
final Map map = new HashMap<>();
- final Result source = updateInfo.getSource().getResult();
- final Result target = updateInfo.getTarget().getResult();
+ final OpenaireBrokerResult source = updateInfo.getSource();
+ final OpenaireBrokerResult target = updateInfo.getTarget();
- final List collectedFrom = target.getCollectedfrom();
- if (collectedFrom.size() == 1) {
- map.put("target_datasource_id", collectedFrom.get(0).getKey());
- map.put("target_datasource_name", collectedFrom.get(0).getValue());
- }
+ map.put("target_datasource_id", target.getCollectedFromId());
+ map.put("target_datasource_name", target.getCollectedFromName());
- final List ids = target.getOriginalId();
- if (ids.size() > 0) {
- map.put("target_publication_id", ids.get(0));
- }
+ map.put("target_publication_id", target.getOriginalId());
- final List titles = target.getTitle();
+ final List titles = target.getTitles();
if (titles.size() > 0) {
map.put("target_publication_title", titles.get(0));
}
- final long date = parseDateTolong(target.getDateofacceptance().getValue());
+ final long date = parseDateTolong(target.getPublicationdate());
if (date > 0) {
map.put("target_dateofacceptance", date);
}
- final List subjects = target.getSubject();
- if (subjects.size() > 0) {
- map
- .put(
- "target_publication_subject_list",
- subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList()));
- }
-
- final List authors = target.getAuthor();
- if (authors.size() > 0) {
- map
- .put(
- "target_publication_author_list",
- authors.stream().map(Author::getFullname).collect(Collectors.toList()));
- }
+ map.put("target_publication_subject_list", target.getSubjects());
+ map.put("target_publication_author_list", target.getCreators());
// PROVENANCE INFO
map.put("trust", updateInfo.getTrust());
- final List sourceCollectedFrom = source.getCollectedfrom();
- if (sourceCollectedFrom.size() == 1) {
- map.put("provenance_datasource_id", sourceCollectedFrom.get(0).getKey());
- map.put("provenance_datasource_name", sourceCollectedFrom.get(0).getValue());
- }
+ map.put("provenance_datasource_id", source.getCollectedFromId());
+ map.put("provenance_datasource_name", source.getCollectedFromName());
map.put("provenance_publication_id_list", source.getOriginalId());
return map;
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
index 4d40ba80d..940d7f9f3 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
@@ -18,18 +18,20 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.EventFinder;
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelationsAggregator;
import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
@@ -73,6 +75,8 @@ public class GenerateEventsApplication {
log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
final SparkConf conf = new SparkConf();
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.registerKryoClasses(BrokerConstants.getModelClasses());
final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
@@ -80,13 +84,16 @@ public class GenerateEventsApplication {
removeOutputDir(spark, eventsPath);
- final Dataset all = spark.emptyDataset(Encoders.kryo(Event.class));
-
- for (final Class extends Result> r1 : BrokerConstants.RESULT_CLASSES) {
- all.union(generateEvents(spark, graphPath, r1, dedupConfig));
- }
-
- all.write().mode(SaveMode.Overwrite).option("compression", "gzip").json(eventsPath);
+ spark
+ .emptyDataset(Encoders.kryo(Event.class))
+ .union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
+ .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
+ .union(generateEvents(spark, graphPath, Software.class, dedupConfig))
+ .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(eventsPath);
});
}
@@ -101,18 +108,18 @@ public class GenerateEventsApplication {
final Class sourceClass,
final DedupConfig dedupConfig) {
- final Dataset results = expandResultsWithRelations(spark, graphPath, sourceClass);
+ final Dataset results = expandResultsWithRelations(spark, graphPath, sourceClass);
final Dataset mergedRels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
- final TypedColumn, ResultGroup> aggr = new ResultAggregator()
+ final TypedColumn, ResultGroup> aggr = new ResultAggregator()
.toColumn();
return results
.joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner")
.groupByKey(
- (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING())
+ (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING())
.agg(aggr)
.map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class))
.filter(ResultGroup::isValid)
@@ -122,7 +129,7 @@ public class GenerateEventsApplication {
.flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class));
}
- private static Dataset expandResultsWithRelations(
+ private static Dataset expandResultsWithRelations(
final SparkSession spark,
final String graphPath,
final Class sourceClass) {
@@ -135,14 +142,15 @@ public class GenerateEventsApplication {
final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
- final Dataset r0 = readPath(
+ final Dataset r0 = readPath(
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class)
.filter(r -> r.getDataInfo().getDeletedbyinference())
- .map(r -> new ResultWithRelations(r), Encoders.kryo(ResultWithRelations.class));
- final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
- final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class));
- final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class));
- final Dataset r4 = join(
+ .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class));
+
+ final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
+ final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class));
+ final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class));
+ final Dataset r4 = join(
r3, rels, relatedEntities(publications, rels, RelatedProject.class));
;
@@ -159,20 +167,20 @@ public class GenerateEventsApplication {
Encoders.kryo(clazz));
}
- private static Dataset join(final Dataset sources,
+ private static Dataset join(final Dataset sources,
final Dataset rels,
final Dataset typedRels) {
- final TypedColumn, ResultWithRelations> aggr = new ResultWithRelationsAggregator()
+ final TypedColumn, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator()
.toColumn();
;
return sources
.joinWith(typedRels, sources.col("result.id").equalTo(rels.col("source")), "left_outer")
.groupByKey(
- (MapFunction, String>) t -> t._1.getResult().getId(), Encoders.STRING())
+ (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
.agg(aggr)
- .map(t -> t._2, Encoders.kryo(ResultWithRelations.class));
+ .map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class));
}
public static Dataset readPath(
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java
index 0e57f32f9..13aeefb2f 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java
@@ -12,22 +12,20 @@ import java.util.function.Function;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
-import eu.dnetlib.broker.objects.Publication;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
-import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.pace.config.DedupConfig;
public abstract class UpdateMatcher {
private final boolean multipleUpdate;
private final Function topicFunction;
- private final BiConsumer compileHighlightFunction;
+ private final BiConsumer compileHighlightFunction;
private final Function highlightToStringFunction;
public UpdateMatcher(final boolean multipleUpdate, final Function topicFunction,
- final BiConsumer compileHighlightFunction,
+ final BiConsumer compileHighlightFunction,
final Function highlightToStringFunction) {
this.multipleUpdate = multipleUpdate;
this.topicFunction = topicFunction;
@@ -35,19 +33,18 @@ public abstract class UpdateMatcher {
this.highlightToStringFunction = highlightToStringFunction;
}
- public Collection> searchUpdatesForRecord(final ResultWithRelations res,
- final Collection others,
+ public Collection> searchUpdatesForRecord(final OpenaireBrokerResult res,
+ final Collection others,
final DedupConfig dedupConfig) {
final Map> infoMap = new HashMap<>();
- for (final ResultWithRelations source : others) {
+ for (final OpenaireBrokerResult source : others) {
if (source != res) {
for (final T hl : findDifferences(source, res)) {
final Topic topic = getTopicFunction().apply(hl);
final UpdateInfo info = new UpdateInfo<>(topic, hl, source, res, getCompileHighlightFunction(),
- getHighlightToStringFunction(),
- dedupConfig);
+ getHighlightToStringFunction(), dedupConfig);
final String s = DigestUtils.md5Hex(info.getHighlightValueAsString());
if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {
} else {
@@ -71,14 +68,14 @@ public abstract class UpdateMatcher {
}
}
- protected abstract List findDifferences(ResultWithRelations source, ResultWithRelations target);
+ protected abstract List findDifferences(OpenaireBrokerResult source, OpenaireBrokerResult target);
- protected static boolean isMissing(final List> list) {
- return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0).getValue());
+ protected static boolean isMissing(final List list) {
+ return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0));
}
- protected boolean isMissing(final Field field) {
- return field == null || StringUtils.isBlank(field.getValue());
+ protected boolean isMissing(final String field) {
+ return StringUtils.isBlank(field);
}
public boolean isMultipleUpdate() {
@@ -89,7 +86,7 @@ public abstract class UpdateMatcher {
return topicFunction;
}
- public BiConsumer getCompileHighlightFunction() {
+ public BiConsumer getCompileHighlightFunction() {
return compileHighlightFunction;
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java
index a8e0a2c42..7a58f986b 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java
@@ -5,45 +5,39 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+import eu.dnetlib.broker.objects.Dataset;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
-import eu.dnetlib.dhp.schema.oaf.Dataset;
public abstract class AbstractEnrichMissingDataset
- extends UpdateMatcher {
+ extends UpdateMatcher {
public AbstractEnrichMissingDataset(final Topic topic) {
super(true,
rel -> topic,
(p, rel) -> p.getDatasets().add(rel),
- rel -> rel.getInstances().get(0).getUrl());
+ rel -> rel.getOriginalId());
}
protected abstract boolean filterByType(String relType);
@Override
- protected final List findDifferences(
- final ResultWithRelations source,
- final ResultWithRelations target) {
+ protected final List findDifferences(final OpenaireBrokerResult source,
+ final OpenaireBrokerResult target) {
final Set existingDatasets = target
.getDatasets()
.stream()
.filter(rel -> filterByType(rel.getRelType()))
- .map(RelatedDataset::getRelDataset)
- .map(Dataset::getId)
+ .map(Dataset::getOriginalId)
.collect(Collectors.toSet());
return source
.getDatasets()
.stream()
.filter(rel -> filterByType(rel.getRelType()))
- .map(RelatedDataset::getRelDataset)
- .filter(d -> !existingDatasets.contains(d.getId()))
- .map(ConversionUtils::oafDatasetToBrokerDataset)
+ .filter(d -> !existingDatasets.contains(d.getOriginalId()))
.collect(Collectors.toList());
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java
index ab4325c1e..fa5fde725 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java
@@ -1,19 +1,15 @@
package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import eu.dnetlib.broker.objects.Project;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
-public class EnrichMissingProject
- extends UpdateMatcher {
+public class EnrichMissingProject extends UpdateMatcher {
public EnrichMissingProject() {
super(true,
@@ -23,16 +19,11 @@ public class EnrichMissingProject
}
@Override
- protected List findDifferences(final ResultWithRelations source, final ResultWithRelations target) {
- if (source.getProjects().isEmpty()) {
- return Arrays.asList();
+ protected List findDifferences(final OpenaireBrokerResult source, final OpenaireBrokerResult target) {
+ if (target.getProjects().isEmpty()) {
+ return source.getProjects();
} else {
- return target
- .getProjects()
- .stream()
- .map(RelatedProject::getRelProject)
- .map(ConversionUtils::oafProjectToBrokerProject)
- .collect(Collectors.toList());
+ return new ArrayList<>();
}
}
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java
index 3bf23a36b..ca63aeb49 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java
@@ -5,39 +5,38 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.Project;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
-import eu.dnetlib.dhp.schema.oaf.Project;
-public class EnrichMoreProject extends UpdateMatcher {
+public class EnrichMoreProject extends UpdateMatcher {
public EnrichMoreProject() {
super(true,
prj -> Topic.ENRICH_MORE_PROJECT,
(p, prj) -> p.getProjects().add(prj),
- prj -> prj.getFunder() + "::" + prj.getFundingProgram() + prj.getCode());
+ prj -> projectAsString(prj));
+ }
+
+ private static String projectAsString(final Project prj) {
+ return prj.getFunder() + "::" + prj.getFundingProgram() + "::" + prj.getCode();
}
@Override
- protected List findDifferences(final ResultWithRelations source,
- final ResultWithRelations target) {
+ protected List findDifferences(final OpenaireBrokerResult source,
+ final OpenaireBrokerResult target) {
- final Set existingProjects = source
+ final Set existingProjects = target
.getProjects()
.stream()
- .map(RelatedProject::getRelProject)
- .map(Project::getId)
+ .map(EnrichMoreProject::projectAsString)
.collect(Collectors.toSet());
- return target
+ return source
.getProjects()
.stream()
- .map(RelatedProject::getRelProject)
- .filter(p -> !existingProjects.contains(p.getId()))
- .map(ConversionUtils::oafProjectToBrokerProject)
+ .filter(p -> !existingProjects.contains(projectAsString(p)))
.collect(Collectors.toList());
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java
index bba3e9648..300863949 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java
@@ -5,21 +5,18 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.Publication;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
-import eu.dnetlib.dhp.schema.oaf.Publication;
-public abstract class AbstractEnrichMissingPublication
- extends UpdateMatcher {
+public abstract class AbstractEnrichMissingPublication extends UpdateMatcher {
public AbstractEnrichMissingPublication(final Topic topic) {
super(true,
rel -> topic,
(p, rel) -> p.getPublications().add(rel),
- rel -> rel.getInstances().get(0).getUrl());
+ rel -> rel.getOriginalId());
}
@@ -27,24 +24,21 @@ public abstract class AbstractEnrichMissingPublication
@Override
protected final List findDifferences(
- final ResultWithRelations source,
- final ResultWithRelations target) {
+ final OpenaireBrokerResult source,
+ final OpenaireBrokerResult target) {
final Set existingPublications = target
.getPublications()
.stream()
.filter(rel -> filterByType(rel.getRelType()))
- .map(RelatedPublication::getRelPublication)
- .map(Publication::getId)
+ .map(Publication::getOriginalId)
.collect(Collectors.toSet());
return source
.getPublications()
.stream()
.filter(rel -> filterByType(rel.getRelType()))
- .map(RelatedPublication::getRelPublication)
- .filter(d -> !existingPublications.contains(d.getId()))
- .map(ConversionUtils::oafResultToBrokerPublication)
+ .filter(p -> !existingPublications.contains(p.getOriginalId()))
.collect(Collectors.toList());
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java
index 6090939dc..76ae061e6 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java
@@ -1,15 +1,12 @@
package eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
public class EnrichMissingSoftware
extends UpdateMatcher {
@@ -23,18 +20,13 @@ public class EnrichMissingSoftware
@Override
protected List findDifferences(
- final ResultWithRelations source,
- final ResultWithRelations target) {
+ final OpenaireBrokerResult source,
+ final OpenaireBrokerResult target) {
- if (source.getSoftwares().isEmpty()) {
- return Arrays.asList();
+ if (target.getSoftwares().isEmpty()) {
+ return source.getSoftwares();
} else {
- return target
- .getSoftwares()
- .stream()
- .map(RelatedSoftware::getRelSoftware)
- .map(ConversionUtils::oafSoftwareToBrokerSoftware)
- .collect(Collectors.toList());
+ return new ArrayList<>();
}
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java
index ba422f436..ebd421b8e 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java
@@ -5,15 +5,12 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.Software;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
-import eu.dnetlib.dhp.schema.oaf.Software;
-public class EnrichMoreSoftware
- extends UpdateMatcher {
+public class EnrichMoreSoftware extends UpdateMatcher {
public EnrichMoreSoftware() {
super(true,
@@ -24,22 +21,19 @@ public class EnrichMoreSoftware
@Override
protected List findDifferences(
- final ResultWithRelations source,
- final ResultWithRelations target) {
+ final OpenaireBrokerResult source,
+ final OpenaireBrokerResult target) {
final Set existingSoftwares = source
.getSoftwares()
.stream()
- .map(RelatedSoftware::getRelSoftware)
- .map(Software::getId)
+ .map(Software::getName)
.collect(Collectors.toSet());
return target
.getSoftwares()
.stream()
- .map(RelatedSoftware::getRelSoftware)
- .filter(p -> !existingSoftwares.contains(p.getId()))
- .map(ConversionUtils::oafSoftwareToBrokerSoftware)
+ .filter(p -> !existingSoftwares.contains(p.getName()))
.collect(Collectors.toList());
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java
index 25d5f9d8a..b2cbbce2c 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAbstract.java
@@ -5,9 +5,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
public class EnrichMissingAbstract extends UpdateMatcher {
@@ -19,13 +19,12 @@ public class EnrichMissingAbstract extends UpdateMatcher {
}
@Override
- protected List findDifferences(final ResultWithRelations source,
- final ResultWithRelations target) {
- if (isMissing(target.getResult().getDescription()) && !isMissing(source.getResult().getDescription())) {
- return Arrays
- .asList(source.getResult().getDescription().get(0).getValue());
+ protected List findDifferences(final OpenaireBrokerResult source, final OpenaireBrokerResult target) {
+ if (isMissing(target.getAbstracts()) && !isMissing(source.getAbstracts())) {
+ return Arrays.asList(source.getAbstracts().get(0));
+ } else {
+ return new ArrayList<>();
}
- return new ArrayList<>();
}
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java
index eed2d0be8..c4b96e67b 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java
@@ -1,53 +1,43 @@
package eu.dnetlib.dhp.broker.oa.matchers.simple;
-import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+
+import eu.dnetlib.broker.objects.Author;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
-import eu.dnetlib.dhp.schema.oaf.Author;
-import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
-public class EnrichMissingAuthorOrcid extends UpdateMatcher {
+public class EnrichMissingAuthorOrcid extends UpdateMatcher {
public EnrichMissingAuthorOrcid() {
super(true,
aut -> Topic.ENRICH_MISSING_AUTHOR_ORCID,
(p, aut) -> p.getCreators().add(aut),
- aut -> aut);
+ aut -> aut.getOrcid());
}
@Override
- protected List findDifferences(final ResultWithRelations source,
- final ResultWithRelations target) {
+ protected List findDifferences(final OpenaireBrokerResult source,
+ final OpenaireBrokerResult target) {
final Set existingOrcids = target
- .getResult()
- .getAuthor()
+ .getCreators()
.stream()
- .map(Author::getPid)
- .flatMap(List::stream)
- .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid"))
- .map(pid -> pid.getValue())
+ .map(Author::getOrcid)
+ .filter(StringUtils::isNotBlank)
.collect(Collectors.toSet());
- final List list = new ArrayList<>();
+ return source
+ .getCreators()
+ .stream()
+ .filter(a -> StringUtils.isNotBlank(a.getOrcid()))
+ .filter(a -> !existingOrcids.contains(a.getOrcid()))
+ .collect(Collectors.toList());
- for (final Author author : source.getResult().getAuthor()) {
- final String name = author.getFullname();
-
- for (final StructuredProperty pid : author.getPid()) {
- if (pid.getQualifier().getClassid().equalsIgnoreCase("orcid")
- && !existingOrcids.contains(pid.getValue())) {
- list.add(name + " [ORCID: " + pid.getValue() + "]");
- }
- }
- }
-
- return list;
}
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java
index e8ca18ae9..e870cf1fa 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java
@@ -6,11 +6,10 @@ import java.util.List;
import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.Instance;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
-import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
public class EnrichMissingOpenAccess extends UpdateMatcher {
@@ -22,13 +21,12 @@ public class EnrichMissingOpenAccess extends UpdateMatcher {
}
@Override
- protected List findDifferences(final ResultWithRelations source,
- final ResultWithRelations target) {
+ protected List findDifferences(final OpenaireBrokerResult source,
+ final OpenaireBrokerResult target) {
final long count = target
- .getResult()
- .getInstance()
+ .getInstances()
.stream()
- .map(i -> i.getAccessright().getClassid())
+ .map(Instance::getLicense)
.filter(right -> right.equals(BrokerConstants.OPEN_ACCESS))
.count();
@@ -37,12 +35,9 @@ public class EnrichMissingOpenAccess extends UpdateMatcher {
}
return source
- .getResult()
- .getInstance()
+ .getInstances()
.stream()
- .filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
- .map(ConversionUtils::oafInstanceToBrokerInstances)
- .flatMap(List::stream)
+ .filter(i -> i.getLicense().equals(BrokerConstants.OPEN_ACCESS))
.collect(Collectors.toList());
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java
index 6c061a2e8..cc72d9fa9 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java
@@ -5,13 +5,12 @@ import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
-import eu.dnetlib.broker.objects.Pid;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.TypedValue;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
-public class EnrichMissingPid extends UpdateMatcher {
+public class EnrichMissingPid extends UpdateMatcher {
public EnrichMissingPid() {
super(true,
@@ -21,19 +20,17 @@ public class EnrichMissingPid extends UpdateMatcher {
}
@Override
- protected List findDifferences(final ResultWithRelations source,
- final ResultWithRelations target) {
- final long count = target.getResult().getPid().size();
+ protected List findDifferences(final OpenaireBrokerResult source,
+ final OpenaireBrokerResult target) {
+ final long count = target.getPids().size();
if (count > 0) {
return Arrays.asList();
}
return source
- .getResult()
- .getPid()
+ .getPids()
.stream()
- .map(ConversionUtils::oafPidToBrokerPid)
.collect(Collectors.toList());
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java
index 27a71740c..ed8c26b5a 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDate.java
@@ -5,9 +5,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
public class EnrichMissingPublicationDate extends UpdateMatcher {
@@ -19,13 +19,14 @@ public class EnrichMissingPublicationDate extends UpdateMatcher {
}
@Override
- protected List findDifferences(final ResultWithRelations source,
- final ResultWithRelations target) {
- if (isMissing(target.getResult().getDateofacceptance())
- && !isMissing(source.getResult().getDateofacceptance())) {
- return Arrays.asList(source.getResult().getDateofacceptance().getValue());
+ protected List findDifferences(final OpenaireBrokerResult source,
+ final OpenaireBrokerResult target) {
+
+ if (isMissing(target.getPublicationdate()) && !isMissing(source.getPublicationdate())) {
+ return Arrays.asList(source.getPublicationdate());
+ } else {
+ return new ArrayList<>();
}
- return new ArrayList<>();
}
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java
index 04de30089..07b1fa41a 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java
@@ -5,42 +5,38 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.tuple.Pair;
-
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.TypedValue;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
-import eu.dnetlib.dhp.schema.oaf.Qualifier;
-import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
-public class EnrichMissingSubject extends UpdateMatcher> {
+public class EnrichMissingSubject extends UpdateMatcher {
public EnrichMissingSubject() {
super(true,
- pair -> Topic.fromPath("ENRICH/MISSING/SUBJECT/" + pair.getLeft()),
- (p, pair) -> p.getSubjects().add(pair.getRight()),
- pair -> pair.getLeft() + "::" + pair.getRight());
+ s -> Topic.fromPath("ENRICH/MISSING/SUBJECT/" + s.getType()),
+ (p, s) -> p.getSubjects().add(s),
+ s -> subjectAsString(s));
}
@Override
- protected List> findDifferences(final ResultWithRelations source,
- final ResultWithRelations target) {
- final Set existingTypes = target
- .getResult()
- .getSubject()
+ protected List findDifferences(final OpenaireBrokerResult source,
+ final OpenaireBrokerResult target) {
+ final Set existingSubject = target
+ .getSubjects()
.stream()
- .map(StructuredProperty::getQualifier)
- .map(Qualifier::getClassid)
+ .map(s -> subjectAsString(s))
.collect(Collectors.toSet());
return source
- .getResult()
- .getPid()
+ .getSubjects()
.stream()
- .filter(pid -> !existingTypes.contains(pid.getQualifier().getClassid()))
- .map(ConversionUtils::oafSubjectToPair)
+ .filter(s -> !existingSubject.contains(subjectAsString(s)))
.collect(Collectors.toList());
}
+ private static String subjectAsString(final TypedValue s) {
+ return s.getType() + "::" + s.getValue();
+ }
+
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java
index 3ee4c8bdd..bfef3ee4f 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java
@@ -6,11 +6,10 @@ import java.util.Set;
import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.Instance;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
-import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
public class EnrichMoreOpenAccess extends UpdateMatcher {
@@ -22,24 +21,19 @@ public class EnrichMoreOpenAccess extends UpdateMatcher {
}
@Override
- protected List findDifferences(final ResultWithRelations source,
- final ResultWithRelations target) {
+ protected List findDifferences(final OpenaireBrokerResult source,
+ final OpenaireBrokerResult target) {
final Set urls = target
- .getResult()
- .getInstance()
+ .getInstances()
.stream()
- .filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
+ .filter(i -> i.getLicense().equals(BrokerConstants.OPEN_ACCESS))
.map(i -> i.getUrl())
- .flatMap(List::stream)
.collect(Collectors.toSet());
return source
- .getResult()
- .getInstance()
+ .getInstances()
.stream()
- .filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
- .map(ConversionUtils::oafInstanceToBrokerInstances)
- .flatMap(List::stream)
+ .filter(i -> i.getLicense().equals(BrokerConstants.OPEN_ACCESS))
.filter(i -> !urls.contains(i.getUrl()))
.collect(Collectors.toList());
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java
index 694c9460a..d1f2e6022 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java
@@ -5,38 +5,37 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
-import eu.dnetlib.broker.objects.Pid;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.TypedValue;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
-public class EnrichMorePid extends UpdateMatcher {
+public class EnrichMorePid extends UpdateMatcher {
public EnrichMorePid() {
super(true,
pid -> Topic.ENRICH_MORE_PID,
(p, pid) -> p.getPids().add(pid),
- pid -> pid.getType() + "::" + pid.getValue());
+ pid -> pidAsString(pid));
}
@Override
- protected List findDifferences(final ResultWithRelations source,
- final ResultWithRelations target) {
+ protected List findDifferences(final OpenaireBrokerResult source,
+ final OpenaireBrokerResult target) {
final Set existingPids = target
- .getResult()
- .getPid()
+ .getPids()
.stream()
- .map(pid -> pid.getQualifier().getClassid() + "::" + pid.getValue())
+ .map(pid -> pidAsString(pid))
.collect(Collectors.toSet());
return source
- .getResult()
- .getPid()
+ .getPids()
.stream()
- .filter(pid -> !existingPids.contains(pid.getQualifier().getClassid() + "::" + pid.getValue()))
- .map(ConversionUtils::oafPidToBrokerPid)
+ .filter(pid -> !existingPids.contains(pidAsString(pid)))
.collect(Collectors.toList());
}
+ private static String pidAsString(final TypedValue pid) {
+ return pid.getType() + "::" + pid.getValue();
+ }
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java
index 7d0b1a65e..39225e8ab 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java
@@ -5,39 +5,37 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.tuple.Pair;
-
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.TypedValue;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
-import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
-public class EnrichMoreSubject extends UpdateMatcher> {
+public class EnrichMoreSubject extends UpdateMatcher {
public EnrichMoreSubject() {
super(true,
- pair -> Topic.fromPath("ENRICH/MORE/SUBJECT/" + pair.getLeft()),
- (p, pair) -> p.getSubjects().add(pair.getRight()),
- pair -> pair.getLeft() + "::" + pair.getRight());
+ s -> Topic.fromPath("ENRICH/MORE/SUBJECT/" + s.getType()),
+ (p, s) -> p.getSubjects().add(s),
+ s -> subjectAsString(s));
}
@Override
- protected List> findDifferences(final ResultWithRelations source,
- final ResultWithRelations target) {
+ protected List findDifferences(final OpenaireBrokerResult source,
+ final OpenaireBrokerResult target) {
final Set existingSubjects = target
- .getResult()
- .getSubject()
+ .getSubjects()
.stream()
- .map(pid -> pid.getQualifier().getClassid() + "::" + pid.getValue())
+ .map(pid -> subjectAsString(pid))
.collect(Collectors.toSet());
return source
- .getResult()
- .getPid()
+ .getPids()
.stream()
- .filter(pid -> !existingSubjects.contains(pid.getQualifier().getClassid() + "::" + pid.getValue()))
- .map(ConversionUtils::oafSubjectToPair)
+ .filter(s -> !existingSubjects.contains(subjectAsString(s)))
.collect(Collectors.toList());
}
+ private static String subjectAsString(final TypedValue s) {
+ return s.getType() + "::" + s.getValue();
+ }
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java
index 0665c69dd..49c46c7f0 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java
@@ -2,13 +2,12 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Set;
-import eu.dnetlib.dhp.schema.oaf.Dataset;
-import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
-import eu.dnetlib.dhp.schema.oaf.Publication;
-import eu.dnetlib.dhp.schema.oaf.Result;
-import eu.dnetlib.dhp.schema.oaf.Software;
+import eu.dnetlib.dhp.broker.model.Event;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
public class BrokerConstants {
@@ -18,7 +17,11 @@ public class BrokerConstants {
public static final float MIN_TRUST = 0.25f;
public static final float MAX_TRUST = 1.00f;
- public static final List> RESULT_CLASSES = Arrays
- .asList(Publication.class, Dataset.class, Software.class, OtherResearchProduct.class);
+ public static Class>[] getModelClasses() {
+ final Set> list = new HashSet<>();
+ list.addAll(Arrays.asList(ModelSupport.getOafModelClasses()));
+ list.addAll(Arrays.asList(ResultGroup.class, Event.class));
+ return list.toArray(new Class[] {});
+ }
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java
index 2b39115b1..b80848682 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java
@@ -13,7 +13,8 @@ import org.dom4j.DocumentHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import eu.dnetlib.broker.objects.Pid;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import eu.dnetlib.broker.objects.TypedValue;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.ExternalReference;
@@ -41,8 +42,8 @@ public class ConversionUtils {
}).collect(Collectors.toList());
}
- public static Pid oafPidToBrokerPid(final StructuredProperty sp) {
- return sp != null ? new Pid()
+ public static TypedValue oafPidToBrokerPid(final StructuredProperty sp) {
+ return sp != null ? new TypedValue()
.setValue(sp.getValue())
.setType(sp.getQualifier().getClassid()) : null;
}
@@ -54,7 +55,7 @@ public class ConversionUtils {
public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) {
return d != null ? new eu.dnetlib.broker.objects.Dataset()
.setOriginalId(d.getOriginalId().get(0))
- .setTitles(structPropList(d.getTitle()))
+ .setTitle(structPropValue(d.getTitle()))
.setPids(d.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
.setInstances(
d
@@ -63,26 +64,30 @@ public class ConversionUtils {
.map(ConversionUtils::oafInstanceToBrokerInstances)
.flatMap(List::stream)
.collect(Collectors.toList()))
- .setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toList()))
+ .setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null))
: null;
}
- public static final eu.dnetlib.broker.objects.Publication oafResultToBrokerPublication(final Result result) {
+ public static final OpenaireBrokerResult oafResultToBrokerResult(final Result result) {
- return result != null ? new eu.dnetlib.broker.objects.Publication()
+ return result != null ? new OpenaireBrokerResult()
+ .setOpenaireId(result.getId())
.setOriginalId(result.getOriginalId().get(0))
+ .setTypology(result.getResulttype().getClassid())
.setTitles(structPropList(result.getTitle()))
.setAbstracts(fieldList(result.getDescription()))
.setLanguage(result.getLanguage().getClassid())
- .setSubjects(structPropList(result.getSubject()))
- .setCreators(result.getAuthor().stream().map(Author::getFullname).collect(Collectors.toList()))
- .setPublicationdate(result.getDateofcollection())
+ .setSubjects(structPropTypedList(result.getSubject()))
+ .setCreators(
+ result.getAuthor().stream().map(ConversionUtils::oafAuthorToBrokerAuthor).collect(Collectors.toList()))
+ .setPublicationdate(result.getDateofacceptance().getValue())
.setPublisher(fieldValue(result.getPublisher()))
.setEmbargoenddate(fieldValue(result.getEmbargoenddate()))
.setContributor(fieldList(result.getContributor()))
.setJournal(
result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null)
- .setCollectedFrom(result.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toList()))
+ .setCollectedFromId(result.getCollectedfrom().stream().map(KeyValue::getKey).findFirst().orElse(null))
+ .setCollectedFromName(result.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null))
.setPids(result.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
.setInstances(
result
@@ -100,6 +105,30 @@ public class ConversionUtils {
: null;
}
+ private static List structPropTypedList(final List list) {
+ return list
+ .stream()
+ .map(
+ p -> new TypedValue()
+ .setValue(p.getValue())
+ .setType(p.getQualifier().getClassid()))
+ .collect(Collectors.toList());
+ }
+
+ private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) {
+ return author != null ? new eu.dnetlib.broker.objects.Author()
+ .setFullname(author.getFullname())
+ .setOrcid(
+ author
+ .getPid()
+ .stream()
+ .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid"))
+ .map(pid -> pid.getValue())
+ .findFirst()
+ .orElse(null))
+ : null;
+ }
+
private static eu.dnetlib.broker.objects.Journal oafJournalToBrokerJournal(final Journal journal) {
return journal != null ? new eu.dnetlib.broker.objects.Journal()
.setName(journal.getName())
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java
index b4de08db7..4c20ac5ca 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java
@@ -4,6 +4,7 @@ package eu.dnetlib.dhp.broker.oa.util;
import java.util.ArrayList;
import java.util.List;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import eu.dnetlib.dhp.broker.model.EventFactory;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy;
@@ -30,7 +31,6 @@ import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
import eu.dnetlib.pace.config.DedupConfig;
public class EventFinder {
@@ -68,7 +68,7 @@ public class EventFinder {
public static EventGroup generateEvents(final ResultGroup results, final DedupConfig dedupConfig) {
final List> list = new ArrayList<>();
- for (final ResultWithRelations target : results.getData()) {
+ for (final OpenaireBrokerResult target : results.getData()) {
for (final UpdateMatcher> matcher : matchers) {
list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig));
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
index 9da636413..82d017864 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
@@ -1,7 +1,6 @@
package eu.dnetlib.dhp.broker.oa.util;
-import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Function;
@@ -10,14 +9,11 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.broker.objects.Instance;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import eu.dnetlib.broker.objects.Provenance;
-import eu.dnetlib.broker.objects.Publication;
import eu.dnetlib.dhp.broker.model.Topic;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
-import eu.dnetlib.dhp.schema.oaf.Instance;
-import eu.dnetlib.dhp.schema.oaf.KeyValue;
-import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.tree.support.TreeProcessor;
@@ -29,11 +25,11 @@ public final class UpdateInfo {
private final T highlightValue;
- private final ResultWithRelations source;
+ private final OpenaireBrokerResult source;
- private final ResultWithRelations target;
+ private final OpenaireBrokerResult target;
- private final BiConsumer compileHighlight;
+ private final BiConsumer compileHighlight;
private final Function highlightToString;
@@ -41,9 +37,9 @@ public final class UpdateInfo {
private static final Logger log = LoggerFactory.getLogger(UpdateInfo.class);
- public UpdateInfo(final Topic topic, final T highlightValue, final ResultWithRelations source,
- final ResultWithRelations target,
- final BiConsumer compileHighlight,
+ public UpdateInfo(final Topic topic, final T highlightValue, final OpenaireBrokerResult source,
+ final OpenaireBrokerResult target,
+ final BiConsumer compileHighlight,
final Function highlightToString,
final DedupConfig dedupConfig) {
this.topic = topic;
@@ -52,22 +48,23 @@ public final class UpdateInfo {
this.target = target;
this.compileHighlight = compileHighlight;
this.highlightToString = highlightToString;
- this.trust = calculateTrust(dedupConfig, source.getResult(), target.getResult());
+ this.trust = calculateTrust(dedupConfig, source, target);
}
public T getHighlightValue() {
return highlightValue;
}
- public ResultWithRelations getSource() {
+ public OpenaireBrokerResult getSource() {
return source;
}
- public ResultWithRelations getTarget() {
+ public OpenaireBrokerResult getTarget() {
return target;
}
- private float calculateTrust(final DedupConfig dedupConfig, final Result r1, final Result r2) {
+ private float calculateTrust(final DedupConfig dedupConfig, final OpenaireBrokerResult r1,
+ final OpenaireBrokerResult r2) {
try {
final ObjectMapper objectMapper = new ObjectMapper();
final MapDocument doc1 = MapDocumentUtil
@@ -103,26 +100,18 @@ public final class UpdateInfo {
public OpenAireEventPayload asBrokerPayload() {
- final Publication p = ConversionUtils.oafResultToBrokerPublication(getSource().getResult());
- compileHighlight.accept(p, getHighlightValue());
+ compileHighlight.accept(target, getHighlightValue());
- final Publication hl = new Publication();
+ final OpenaireBrokerResult hl = new OpenaireBrokerResult();
compileHighlight.accept(hl, getHighlightValue());
- final String provId = getSource().getResult().getOriginalId().stream().findFirst().orElse(null);
- final String provRepo = getSource()
- .getResult()
- .getCollectedfrom()
- .stream()
- .map(KeyValue::getValue)
- .findFirst()
- .orElse(null);
+ final String provId = getSource().getOriginalId();
+ final String provRepo = getSource().getCollectedFromName();
+
final String provUrl = getSource()
- .getResult()
- .getInstance()
+ .getInstances()
.stream()
.map(Instance::getUrl)
- .flatMap(List::stream)
.findFirst()
.orElse(null);
;
@@ -130,7 +119,7 @@ public final class UpdateInfo {
final Provenance provenance = new Provenance().setId(provId).setRepositoryName(provRepo).setUrl(provUrl);
return new OpenAireEventPayload()
- .setPublication(p)
+ .setPublication(target)
.setHighlight(hl)
.setTrust(trust)
.setProvenance(provenance);
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java
index 397f30660..dabe2bb4d 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultAggregator.java
@@ -5,11 +5,11 @@ import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
-public class ResultAggregator extends Aggregator, ResultGroup, ResultGroup> {
+public class ResultAggregator extends Aggregator, ResultGroup, ResultGroup> {
/**
*
@@ -22,7 +22,7 @@ public class ResultAggregator extends Aggregator t) {
+ public ResultGroup reduce(final ResultGroup group, final Tuple2 t) {
return group.addElement(t._1);
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java
index 81f189d62..4308224a5 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/simple/ResultGroup.java
@@ -5,7 +5,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
public class ResultGroup implements Serializable {
@@ -14,13 +14,13 @@ public class ResultGroup implements Serializable {
*/
private static final long serialVersionUID = -3360828477088669296L;
- private final List data = new ArrayList<>();
+ private final List data = new ArrayList<>();
- public List getData() {
+ public List getData() {
return data;
}
- public ResultGroup addElement(final ResultWithRelations elem) {
+ public ResultGroup addElement(final OpenaireBrokerResult elem) {
data.add(elem);
return this;
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java
new file mode 100644
index 000000000..b44fbe367
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/OpenaireBrokerResultAggregator.java
@@ -0,0 +1,69 @@
+
+package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
+
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.expressions.Aggregator;
+
+import eu.dnetlib.broker.objects.OpenaireBrokerResult;
+import scala.Tuple2;
+
+public class OpenaireBrokerResultAggregator
+ extends Aggregator, OpenaireBrokerResult, OpenaireBrokerResult> {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -3687878788861013488L;
+
+ @Override
+ public OpenaireBrokerResult zero() {
+ return new OpenaireBrokerResult();
+ }
+
+ @Override
+ public OpenaireBrokerResult finish(final OpenaireBrokerResult g) {
+ return g;
+ }
+
+ @Override
+ public OpenaireBrokerResult reduce(final OpenaireBrokerResult g, final Tuple2 t) {
+ if (g.getOriginalId() == null) {
+ return t._1;
+ } else if (t._2 instanceof RelatedSoftware) {
+ g.getSoftwares().add(((RelatedSoftware) t._2).getRelSoftware());
+ } else if (t._2 instanceof RelatedDataset) {
+ g.getDatasets().add(((RelatedDataset) t._2).getRelDataset());
+ } else if (t._2 instanceof RelatedPublication) {
+ g.getPublications().add(((RelatedPublication) t._2).getRelPublication());
+ } else if (t._2 instanceof RelatedProject) {
+ g.getProjects().add(((RelatedProject) t._2).getRelProject());
+ }
+ return g;
+
+ }
+
+ @Override
+ public OpenaireBrokerResult merge(final OpenaireBrokerResult g1, final OpenaireBrokerResult g2) {
+ if (g1.getOriginalId() != null) {
+ g1.getSoftwares().addAll(g2.getSoftwares());
+ g1.getDatasets().addAll(g2.getDatasets());
+ g1.getPublications().addAll(g2.getPublications());
+ g1.getProjects().addAll(g2.getProjects());
+ return g1;
+ } else {
+ return g2;
+ }
+ }
+
+ @Override
+ public Encoder bufferEncoder() {
+ return Encoders.kryo(OpenaireBrokerResult.class);
+ }
+
+ @Override
+ public Encoder outputEncoder() {
+ return Encoders.kryo(OpenaireBrokerResult.class);
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java
index 84cf693ad..fcf1b89b1 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDataset.java
@@ -1,10 +1,16 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
-import eu.dnetlib.dhp.schema.oaf.Dataset;
+import java.io.Serializable;
-public class RelatedDataset {
+import eu.dnetlib.broker.objects.Dataset;
+public class RelatedDataset implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 774487705184038324L;
private final String source;
private final String relType;
private final Dataset relDataset;
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java
index 490724a44..08d57a1ea 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java
@@ -1,15 +1,17 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
-import eu.dnetlib.dhp.schema.oaf.Dataset;
-import eu.dnetlib.dhp.schema.oaf.Project;
-import eu.dnetlib.dhp.schema.oaf.Publication;
-import eu.dnetlib.dhp.schema.oaf.Software;
+import eu.dnetlib.broker.objects.Dataset;
+import eu.dnetlib.broker.objects.Project;
+import eu.dnetlib.broker.objects.Publication;
+import eu.dnetlib.broker.objects.Software;
public class RelatedEntityFactory {
@SuppressWarnings("unchecked")
- public static