diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml
index 119031b064..1a219c5c91 100644
--- a/dhp-workflows/dhp-broker-events/pom.xml
+++ b/dhp-workflows/dhp-broker-events/pom.xml
@@ -57,9 +57,9 @@
- eu.dnetlib
+ eu.dnetlib.dhp
dnet-openaire-broker-common
- [3.0.4,4.0.0)
+ [3.0.0-SNAPSHOT,)
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java
index a2d92e149e..cfee360c57 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java
@@ -18,8 +18,6 @@ import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
@@ -27,9 +25,6 @@ import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.EventFinder;
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
-import eu.dnetlib.dhp.utils.ISLookupClientFactory;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
-import eu.dnetlib.pace.config.DedupConfig;
public class GenerateEventsJob {
@@ -52,12 +47,6 @@ public class GenerateEventsJob {
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
- final String isLookupUrl = parser.get("isLookupUrl");
- log.info("isLookupUrl: {}", isLookupUrl);
-
- final String dedupConfigProfileId = parser.get("dedupConfProfile");
- log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
-
final String eventsPath = workingPath + "/events";
log.info("eventsPath: {}", eventsPath);
@@ -72,10 +61,6 @@ public class GenerateEventsJob {
final SparkConf conf = new SparkConf();
- // TODO UNCOMMENT
- // final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
- final DedupConfig dedupConfig = null;
-
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, eventsPath);
@@ -90,7 +75,7 @@ public class GenerateEventsJob {
final Dataset dataset = groups
.map(
g -> EventFinder
- .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, dedupConfig, accumulators),
+ .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, accumulators),
Encoders
.bean(EventGroup.class))
.flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class));
@@ -112,23 +97,4 @@ public class GenerateEventsJob {
}
- private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
-
- final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
-
- final String conf = isLookUpService
- .getResourceProfileByQuery(
- String
- .format(
- "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
- profId));
-
- final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class);
- dedupConfig.getPace().initModel();
- dedupConfig.getPace().initTranslationMap();
- // dedupConfig.getWf().setConfigurationId("???");
-
- return dedupConfig;
- }
-
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java
new file mode 100644
index 0000000000..a51601cd76
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java
@@ -0,0 +1,63 @@
+
+package eu.dnetlib.dhp.broker.oa;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.TypedColumn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.broker.model.Event;
+import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.DatasourceStats;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.StatsAggregator;
+
+public class GenerateStatsJob {
+
+ private static final Logger log = LoggerFactory.getLogger(GenerateStatsJob.class);
+
+ public static void main(final String[] args) throws Exception {
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ IndexOnESJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
+ parser.parseArgument(args);
+
+ final Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final SparkConf conf = new SparkConf();
+
+ final String eventsPath = parser.get("workingPath") + "/events";
+ log.info("eventsPath: {}", eventsPath);
+
+ final String statsPath = parser.get("workingPath") + "/stats";
+ log.info("stats: {}", statsPath);
+
+ final TypedColumn aggr = new StatsAggregator().toColumn();
+
+ runWithSparkSession(conf, isSparkSessionManaged, spark -> {
+
+ final Dataset stats = ClusterUtils
+ .readPath(spark, eventsPath, Event.class)
+ .groupByKey(e -> e.getMap().getTargetDatasourceId(), Encoders.STRING())
+ .agg(aggr)
+ .map(t -> t._2, Encoders.bean(DatasourceStats.class));
+
+ ClusterUtils.save(stats, statsPath, DatasourceStats.class, null);
+ });
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java
index cdcf0add4a..55ab497f07 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java
@@ -7,7 +7,6 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.TypedColumn;
@@ -65,9 +64,7 @@ public class JoinStep2Job {
final Dataset dataset = sources
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
- .groupByKey(
- (MapFunction, String>) t -> t._1.getOpenaireId(),
- Encoders.STRING())
+ .groupByKey(t -> t._1.getOpenaireId(), Encoders.STRING())
.agg(aggr)
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java
index af6ab30a1a..3d688fa1d1 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java
@@ -17,7 +17,6 @@ import org.apache.spark.util.LongAccumulator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
-import eu.dnetlib.pace.config.DedupConfig;
public abstract class UpdateMatcher {
@@ -37,7 +36,6 @@ public abstract class UpdateMatcher {
public Collection> searchUpdatesForRecord(final OaBrokerMainEntity res,
final Collection others,
- final DedupConfig dedupConfig,
final Map accumulators) {
final Map> infoMap = new HashMap<>();
@@ -49,7 +47,7 @@ public abstract class UpdateMatcher {
if (topic != null) {
final UpdateInfo info = new UpdateInfo<>(topic, hl, source, res,
getCompileHighlightFunction(),
- getHighlightToStringFunction(), dedupConfig);
+ getHighlightToStringFunction());
final String s = DigestUtils.md5Hex(info.getHighlightValueAsString());
if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java
index 593e66d434..b6328eb954 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java
@@ -37,7 +37,6 @@ import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
-import eu.dnetlib.pace.config.DedupConfig;
public class EventFinder {
@@ -76,7 +75,6 @@ public class EventFinder {
final Set dsIdWhitelist,
final Set dsIdBlacklist,
final Set dsTypeWhitelist,
- final DedupConfig dedupConfig,
final Map accumulators) {
final List> list = new ArrayList<>();
@@ -84,7 +82,7 @@ public class EventFinder {
for (final OaBrokerMainEntity target : results.getData()) {
if (verifyTarget(target, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) {
for (final UpdateMatcher> matcher : matchers) {
- list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig, accumulators));
+ list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), accumulators));
}
}
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java
index 5338d4f3d7..72fe1b204e 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java
@@ -1,8 +1,62 @@
package eu.dnetlib.dhp.broker.oa.util;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.pace.config.DedupConfig;
+import eu.dnetlib.pace.model.MapDocument;
+import eu.dnetlib.pace.tree.support.TreeProcessor;
+import eu.dnetlib.pace.util.MapDocumentUtil;
+
public class TrustUtils {
+ private static final Logger log = LoggerFactory.getLogger(TrustUtils.class);
+
+ private static DedupConfig dedupConfig;
+
+ static {
+ final ObjectMapper mapper = new ObjectMapper();
+ try {
+ dedupConfig = mapper
+ .readValue(
+ DedupConfig.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"),
+ DedupConfig.class);
+ } catch (final IOException e) {
+ log.error("Error loading dedupConfig, e");
+ }
+
+ }
+
+ protected static float calculateTrust(final OaBrokerMainEntity r1, final OaBrokerMainEntity r2) {
+
+ if (dedupConfig == null) {
+ return BrokerConstants.MIN_TRUST;
+ }
+
+ try {
+ final ObjectMapper objectMapper = new ObjectMapper();
+ final MapDocument doc1 = MapDocumentUtil
+ .asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r1));
+ final MapDocument doc2 = MapDocumentUtil
+ .asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r2));
+
+ final double score = new TreeProcessor(dedupConfig).computeScore(doc1, doc2);
+
+ final double threshold = dedupConfig.getWf().getThreshold();
+
+ return TrustUtils.rescale(score, threshold);
+ } catch (final Exception e) {
+ log.error("Error computing score between results", e);
+ return BrokerConstants.MIN_TRUST;
+ }
+ }
+
public static float rescale(final double score, final double threshold) {
if (score >= BrokerConstants.MAX_TRUST) {
return BrokerConstants.MAX_TRUST;
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
index 0586b681ea..ef8fb240c4 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
@@ -4,20 +4,11 @@ package eu.dnetlib.dhp.broker.oa.util;
import java.util.function.BiConsumer;
import java.util.function.Function;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
import eu.dnetlib.broker.objects.OaBrokerEventPayload;
import eu.dnetlib.broker.objects.OaBrokerInstance;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerProvenance;
import eu.dnetlib.dhp.broker.model.Topic;
-import eu.dnetlib.pace.config.DedupConfig;
-import eu.dnetlib.pace.model.MapDocument;
-import eu.dnetlib.pace.tree.support.TreeProcessor;
-import eu.dnetlib.pace.util.MapDocumentUtil;
public final class UpdateInfo {
@@ -35,20 +26,17 @@ public final class UpdateInfo {
private final float trust;
- private static final Logger log = LoggerFactory.getLogger(UpdateInfo.class);
-
public UpdateInfo(final Topic topic, final T highlightValue, final OaBrokerMainEntity source,
final OaBrokerMainEntity target,
final BiConsumer compileHighlight,
- final Function highlightToString,
- final DedupConfig dedupConfig) {
+ final Function highlightToString) {
this.topic = topic;
this.highlightValue = highlightValue;
this.source = source;
this.target = target;
this.compileHighlight = compileHighlight;
this.highlightToString = highlightToString;
- this.trust = calculateTrust(dedupConfig, source, target);
+ this.trust = TrustUtils.calculateTrust(source, target);
}
public T getHighlightValue() {
@@ -63,31 +51,6 @@ public final class UpdateInfo {
return target;
}
- private float calculateTrust(final DedupConfig dedupConfig,
- final OaBrokerMainEntity r1,
- final OaBrokerMainEntity r2) {
-
- if (dedupConfig == null) {
- return BrokerConstants.MIN_TRUST;
- }
-
- try {
- final ObjectMapper objectMapper = new ObjectMapper();
- final MapDocument doc1 = MapDocumentUtil
- .asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r1));
- final MapDocument doc2 = MapDocumentUtil
- .asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r2));
-
- final double score = new TreeProcessor(dedupConfig).computeScore(doc1, doc2);
- final double threshold = dedupConfig.getWf().getThreshold();
-
- return TrustUtils.rescale(score, threshold);
- } catch (final Exception e) {
- log.error("Error computing score between results", e);
- return BrokerConstants.MIN_TRUST;
- }
- }
-
protected Topic getTopic() {
return topic;
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java
new file mode 100644
index 0000000000..8b628809db
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java
@@ -0,0 +1,61 @@
+
+package eu.dnetlib.dhp.broker.oa.util.aggregators.stats;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DatasourceStats implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -282112564184047677L;
+
+ private String id;
+ private String name;
+ private String type;
+ private Map topics = new HashMap<>();
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(final String id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(final String name) {
+ this.name = name;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(final String type) {
+ this.type = type;
+ }
+
+ public Map getTopics() {
+ return topics;
+ }
+
+ public void setTopics(final Map topics) {
+ this.topics = topics;
+ }
+
+ public void incrementTopic(final String topic, final long inc) {
+ if (topics.containsKey(topic)) {
+ topics.put(topic, topics.get(topic) + inc);
+ } else {
+ topics.put(topic, inc);
+ }
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java
new file mode 100644
index 0000000000..5aa6698e39
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java
@@ -0,0 +1,59 @@
+
+package eu.dnetlib.dhp.broker.oa.util.aggregators.stats;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.expressions.Aggregator;
+
+import eu.dnetlib.dhp.broker.model.Event;
+
+public class StatsAggregator extends Aggregator {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 6652105853037330529L;
+
+ @Override
+ public DatasourceStats zero() {
+ return new DatasourceStats();
+ }
+
+ @Override
+ public DatasourceStats reduce(final DatasourceStats stats, final Event e) {
+ stats.setId(e.getMap().getTargetDatasourceId());
+ stats.setName(e.getMap().getTargetDatasourceName());
+ stats.setType(e.getMap().getTargetDatasourceType());
+ stats.incrementTopic(e.getTopic(), 1l);
+ return stats;
+ }
+
+ @Override
+ public DatasourceStats merge(final DatasourceStats stats0, final DatasourceStats stats1) {
+ if (StringUtils.isBlank(stats0.getId())) {
+ stats0.setId(stats1.getId());
+ stats0.setName(stats1.getName());
+ stats0.setType(stats1.getType());
+ }
+ stats1.getTopics().entrySet().forEach(e -> stats0.incrementTopic(e.getKey(), e.getValue()));
+ return stats0;
+ }
+
+ @Override
+ public Encoder bufferEncoder() {
+ return Encoders.bean(DatasourceStats.class);
+
+ }
+
+ @Override
+ public DatasourceStats finish(final DatasourceStats stats) {
+ return stats;
+ }
+
+ @Override
+ public Encoder outputEncoder() {
+ return Encoders.bean(DatasourceStats.class);
+
+ }
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json
new file mode 100644
index 0000000000..d0319b441f
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json
@@ -0,0 +1,122 @@
+{
+ "wf": {
+
+ },
+ "pace": {
+ "clustering": [
+ {
+ "name": "wordssuffixprefix",
+ "fields": [
+ "title"
+ ],
+ "params": {
+ "max": "2",
+ "len": "3"
+ }
+ },
+ {
+ "name": "lowercase",
+ "fields": [
+ "doi"
+ ],
+ "params": {
+
+ }
+ }
+ ],
+ "decisionTree": {
+ "start": {
+ "fields": [
+ {
+ "field": "doi",
+ "comparator": "exactMatch",
+ "weight": 1.0,
+ "countIfUndefined": "false",
+ "params": {
+
+ }
+ }
+ ],
+ "threshold": 0.5,
+ "aggregation": "AVG",
+ "positive": "MATCH",
+ "negative": "layer1",
+ "undefined": "layer1",
+ "ignoreUndefined": "true"
+ },
+ "layer1": {
+ "fields": [
+ {
+ "field": "title",
+ "comparator": "titleVersionMatch",
+ "weight": 0.9,
+ "countIfUndefined": "false",
+ "params": {
+
+ }
+ },
+ {
+ "field": "authors",
+ "comparator": "sizeMatch",
+ "weight": 0.9,
+ "countIfUndefined": "false",
+ "params": {
+
+ }
+ }
+ ],
+ "threshold": 0.5,
+ "aggregation": "AVG",
+ "positive": "MATCH",
+ "negative": "layer2",
+ "undefined": "layer2",
+ "ignoreUndefined": "true"
+ },
+ "layer2": {
+ "fields": [
+ {
+ "field": "title",
+ "comparator": "levensteinTitle",
+ "weight": 1.0,
+ "countIfUndefined": "true",
+ "params": {
+
+ }
+ }
+ ],
+ "threshold": 0.99,
+ "aggregation": "AVG",
+ "positive": "MATCH",
+ "negative": "NO_MATCH",
+ "undefined": "NO_MATCH",
+ "ignoreUndefined": "true"
+ }
+ },
+ "model": [
+ {
+ "name": "doi",
+ "type": "String",
+ "path": "$.pids[?(@.type == 'doi')].value"
+ },
+ {
+ "name": "title",
+ "type": "String",
+ "path": "$.titles",
+ "length": 250,
+ "size": 5
+ },
+ {
+ "name": "authors",
+ "type": "List",
+ "path": "$.creators[*].fullname",
+ "size": 200
+ }
+ ],
+ "blacklists": {
+
+ },
+ "synonyms": {
+
+ }
+ }
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml
index b85c60fdf7..2c728cd981 100644
--- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml
@@ -8,14 +8,6 @@
workingPath
the path where the the generated data will be stored
-
-
- isLookupUrl
- the address of the lookUp service
-
-
- dedupConfProfId
- the id of a valid Dedup Configuration Profile
datasourceIdWhitelist
@@ -427,8 +419,6 @@
--conf spark.sql.shuffle.partitions=3840
--workingPath${workingPath}
- --isLookupUrl${isLookupUrl}
- --dedupConfProfile${dedupConfProfId}
--datasourceIdWhitelist${datasourceIdWhitelist}
--datasourceTypeWhitelist${datasourceTypeWhitelist}
--datasourceIdBlacklist${datasourceIdBlacklist}
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json
index c545884f9e..bab8081932 100644
--- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json
@@ -5,18 +5,6 @@
"paramDescription": "the path where the generated events will be stored",
"paramRequired": true
},
- {
- "paramName": "lu",
- "paramLongName": "isLookupUrl",
- "paramDescription": "the address of the ISLookUpService",
- "paramRequired": true
- },
- {
- "paramName": "d",
- "paramLongName": "dedupConfProfile",
- "paramDescription": "the id of a valid Dedup Configuration Profile",
- "paramRequired": true
- },
{
"paramName": "datasourceIdWhitelist",
"paramLongName": "datasourceIdWhitelist",
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml
index 392271260e..b4155f93ff 100644
--- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml
@@ -9,15 +9,6 @@
workingPath
the path where the the generated data will be stored
-
- isLookupUrl
- the address of the lookUp service
-
-
- dedupConfProfId
- the id of a valid Dedup Configuration Profile
-
-
sparkDriverMemory
memory for driver process
@@ -73,19 +64,19 @@
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
+
yarn
cluster
- Count
- eu.dnetlib.dhp.broker.oa.CheckDuplictedIdsJob
+ GenerateStatsJob
+ eu.dnetlib.dhp.broker.oa.GenerateStatsJob
dhp-broker-events-${projectVersion}.jar
--executor-cores=${sparkExecutorCores}
diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java
index 93bc5617fb..82374b335f 100644
--- a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java
+++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java
@@ -30,7 +30,7 @@ class UpdateMatcherTest {
final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
final Collection> list = matcher
- .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
+ .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty());
}
@@ -46,7 +46,7 @@ class UpdateMatcherTest {
res.setPublicationdate("2018");
final Collection> list = matcher
- .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
+ .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty());
}
@@ -62,7 +62,7 @@ class UpdateMatcherTest {
p2.setPublicationdate("2018");
final Collection> list = matcher
- .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
+ .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.size() == 1);
}
@@ -79,7 +79,7 @@ class UpdateMatcherTest {
p2.setPublicationdate("2018");
final Collection> list = matcher
- .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
+ .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty());
}
@@ -98,7 +98,7 @@ class UpdateMatcherTest {
p4.setPublicationdate("2018");
final Collection> list = matcher
- .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
+ .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty());
}
@@ -117,7 +117,7 @@ class UpdateMatcherTest {
p4.setPublicationdate("2018");
final Collection> list = matcher
- .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null);
+ .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.size() == 1);
}
diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/TrustUtilsTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/TrustUtilsTest.java
index bb23d6085e..974baa28b4 100644
--- a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/TrustUtilsTest.java
+++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/TrustUtilsTest.java
@@ -5,6 +5,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.Test;
+import eu.dnetlib.broker.objects.OaBrokerAuthor;
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.broker.objects.OaBrokerTypedValue;
+
public class TrustUtilsTest {
private static final double THRESHOLD = 0.95;
@@ -64,6 +68,23 @@ public class TrustUtilsTest {
verifyValue(2.00, BrokerConstants.MAX_TRUST);
}
+ @Test
+ public void test() throws Exception {
+ final OaBrokerMainEntity r1 = new OaBrokerMainEntity();
+ r1.getTitles().add("D-NET Service Package: Data Import");
+ r1.getPids().add(new OaBrokerTypedValue("doi", "123"));
+ r1.getCreators().add(new OaBrokerAuthor("Michele Artini", null));
+ r1.getCreators().add(new OaBrokerAuthor("Claudio Atzori", null));
+
+ final OaBrokerMainEntity r2 = new OaBrokerMainEntity();
+ r2.getTitles().add("D-NET Service Package: Data Import");
+ // r2.getPids().add(new OaBrokerTypedValue("doi", "123"));
+ r2.getCreators().add(new OaBrokerAuthor("Michele Artini", null));
+ // r2.getCreators().add(new OaBrokerAuthor("Claudio Atzori", null));
+
+ System.out.println("TRUST: " + TrustUtils.calculateTrust(r1, r2));
+ }
+
private void verifyValue(final double originalScore, final float expectedTrust) {
final float trust = TrustUtils.rescale(originalScore, THRESHOLD);
System.out.println(trust);
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java
index 0b718ecdc3..3fb9d17512 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java
@@ -47,10 +47,11 @@ public class EntityMergerTest implements Serializable {
@Test
public void softwareMergerTest() throws InstantiationException, IllegalAccessException {
- List> softwares = readSample(testEntityBasePath + "/software_merge.json", Software.class);
+ List> softwares = readSample(
+ testEntityBasePath + "/software_merge.json", Software.class);
Software merged = DedupRecordFactory
- .entityMerger(dedupId, softwares.iterator(), 0, dataInfo, Software.class);
+ .entityMerger(dedupId, softwares.iterator(), 0, dataInfo, Software.class);
assertEquals(merged.getBestaccessright().getClassid(), "OPEN SOURCE");
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java
index e1c4b53b50..7091d97403 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
@@ -97,7 +98,7 @@ public class CleanGraphSparkJob {
.json(outputPath);
}
- private static T fixDefaults(T value) {
+ protected static T fixDefaults(T value) {
if (value instanceof Datasource) {
// nothing to clean here
} else if (value instanceof Project) {
@@ -134,11 +135,6 @@ public class CleanGraphSparkJob {
.setResourcetype(
qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE));
}
- if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) {
- r
- .setBestaccessright(
- qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
- }
if (Objects.nonNull(r.getInstance())) {
for (Instance i : r.getInstance()) {
if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) {
@@ -152,6 +148,16 @@ public class CleanGraphSparkJob {
}
}
}
+ if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) {
+ Qualifier bestaccessrights = AbstractMdRecordToOafMapper.createBestAccessRights(r.getInstance());
+ if (Objects.isNull(bestaccessrights)) {
+ r
+ .setBestaccessright(
+ qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
+ } else {
+ r.setBestaccessright(bestaccessrights);
+ }
+ }
if (Objects.nonNull(r.getAuthor())) {
boolean nullRank = r
.getAuthor()
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
index fc77950d01..c43ee29fe0 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java
@@ -378,6 +378,10 @@ public abstract class AbstractMdRecordToOafMapper {
protected abstract Field prepareDatasetStorageDate(Document doc, DataInfo info);
+ public static Qualifier createBestAccessRights(final List instanceList) {
+ return getBestAccessRights(instanceList);
+ }
+
protected static Qualifier getBestAccessRights(final List instanceList) {
if (instanceList != null) {
final Optional min = instanceList
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml
index 575f9229ed..eea8d0a5ab 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml
@@ -16,6 +16,11 @@
postgresPassword
the password postgres
+
+ dbSchema
+ beta
+ the database schema according to the D-Net infrastructure (beta or production)
+
isLookupUrl
the address of the lookUp service
@@ -93,6 +98,7 @@
--postgresUser${postgresUser}
--postgresPassword${postgresPassword}
--isLookupUrl${isLookupUrl}
+ --dbschema${dbSchema}
@@ -109,6 +115,7 @@
--postgresUser${postgresUser}
--postgresPassword${postgresPassword}
--isLookupUrl${isLookupUrl}
+ --dbschema${dbSchema}
--actionclaims
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java
index 4783aa81f6..559a30b1ea 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java
@@ -57,6 +57,8 @@ public class CleaningFunctionTest {
String json = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/result.json"));
Publication p_in = MAPPER.readValue(json, Publication.class);
+ assertNull(p_in.getBestaccessright());
+
assertTrue(p_in instanceof Result);
assertTrue(p_in instanceof Publication);
@@ -84,6 +86,9 @@ public class CleaningFunctionTest {
.map(p -> p.getQualifier())
.allMatch(q -> pidTerms.contains(q.getClassid())));
+ Publication p_defaults = CleanGraphSparkJob.fixDefaults(p_out);
+ assertEquals("CLOSED", p_defaults.getBestaccessright().getClassid());
+
// TODO add more assertions to verity the cleaned values
System.out.println(MAPPER.writeValueAsString(p_out));
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json
index 2c1d5017d6..5d0c0d1ed8 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json
@@ -185,12 +185,7 @@
"surname": ""
}
],
- "bestaccessright": {
- "classid": "CLOSED",
- "classname": "Closed Access",
- "schemeid": "dnet:access_modes",
- "schemename": "dnet:access_modes"
- },
+ "bestaccessright": null,
"collectedfrom": [
{
"key": "10|CSC_________::a2b9ce8435390bcbfc05f3cae3948747",
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java
index 80b8000173..57dca7bb15 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java
@@ -9,6 +9,7 @@ import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
@@ -115,11 +116,21 @@ public class CreateRelatedEntitiesJob_phase1 {
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class)))
.cache();
- Dataset> entities = readPathEntity(spark, inputEntityPath, clazz)
+ final String relatedEntityPath = outputPath + "_relatedEntity";
+ readPathEntity(spark, inputEntityPath, clazz)
.filter("dataInfo.invisible == false")
.map(
(MapFunction) value -> asRelatedEntity(value, clazz),
Encoders.kryo(RelatedEntity.class))
+ .repartition(5000)
+ .write()
+ .mode(SaveMode.Overwrite)
+ .parquet(relatedEntityPath);
+
+ Dataset> entities = spark
+ .read()
+ .load(relatedEntityPath)
+ .as(Encoders.kryo(RelatedEntity.class))
.map(
(MapFunction>) e -> new Tuple2<>(e.getId(), e),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class)))
@@ -165,13 +176,21 @@ public class CreateRelatedEntitiesJob_phase1 {
Result result = (Result) entity;
if (result.getTitle() != null && !result.getTitle().isEmpty()) {
- re.setTitle(result.getTitle().stream().findFirst().get());
+ final StructuredProperty title = result.getTitle().stream().findFirst().get();
+ title.setValue(StringUtils.left(title.getValue(), ProvisionConstants.MAX_TITLE_LENGTH));
+ re.setTitle(title);
}
re.setDateofacceptance(getValue(result.getDateofacceptance()));
re.setPublisher(getValue(result.getPublisher()));
re.setResulttype(result.getResulttype());
- re.setInstances(result.getInstance());
+ re
+ .setInstances(
+ result
+ .getInstance()
+ .stream()
+ .limit(ProvisionConstants.MAX_INSTANCES)
+ .collect(Collectors.toList()));
// TODO still to be mapped
// re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java
index bfcc648a35..7e175121e5 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java
@@ -61,12 +61,6 @@ public class CreateRelatedEntitiesJob_phase2 {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final int MAX_EXTERNAL_ENTITIES = 50;
- private static final int MAX_AUTHORS = 200;
- private static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
- private static final int MAX_TITLE_LENGTH = 5000;
- private static final int MAX_ABSTRACT_LENGTH = 100000;
-
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
@@ -246,15 +240,15 @@ public class CreateRelatedEntitiesJob_phase2 {
List refs = r
.getExternalReference()
.stream()
- .limit(MAX_EXTERNAL_ENTITIES)
+ .limit(ProvisionConstants.MAX_EXTERNAL_ENTITIES)
.collect(Collectors.toList());
r.setExternalReference(refs);
}
if (r.getAuthor() != null) {
List authors = Lists.newArrayList();
for (Author a : r.getAuthor()) {
- a.setFullname(StringUtils.left(a.getFullname(), MAX_AUTHOR_FULLNAME_LENGTH));
- if (authors.size() < MAX_AUTHORS || hasORCID(a)) {
+ a.setFullname(StringUtils.left(a.getFullname(), ProvisionConstants.MAX_AUTHOR_FULLNAME_LENGTH));
+ if (authors.size() < ProvisionConstants.MAX_AUTHORS || hasORCID(a)) {
authors.add(a);
}
}
@@ -266,7 +260,7 @@ public class CreateRelatedEntitiesJob_phase2 {
.stream()
.filter(Objects::nonNull)
.map(d -> {
- d.setValue(StringUtils.left(d.getValue(), MAX_ABSTRACT_LENGTH));
+ d.setValue(StringUtils.left(d.getValue(), ProvisionConstants.MAX_ABSTRACT_LENGTH));
return d;
})
.collect(Collectors.toList());
@@ -278,9 +272,10 @@ public class CreateRelatedEntitiesJob_phase2 {
.stream()
.filter(Objects::nonNull)
.map(t -> {
- t.setValue(StringUtils.left(t.getValue(), MAX_TITLE_LENGTH));
+ t.setValue(StringUtils.left(t.getValue(), ProvisionConstants.MAX_TITLE_LENGTH));
return t;
})
+ .limit(ProvisionConstants.MAX_TITLES)
.collect(Collectors.toList());
r.setTitle(titles);
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java
index eb63d4423d..da0a810217 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java
@@ -100,11 +100,17 @@ public class PrepareRelationsJob {
.orElse(new HashSet<>());
log.info("relationFilter: {}", relationFilter);
- int maxRelations = Optional
- .ofNullable(parser.get("maxRelations"))
+ int sourceMaxRelations = Optional
+ .ofNullable(parser.get("sourceMaxRelations"))
.map(Integer::valueOf)
.orElse(MAX_RELS);
- log.info("maxRelations: {}", maxRelations);
+ log.info("sourceMaxRelations: {}", sourceMaxRelations);
+
+ int targetMaxRelations = Optional
+ .ofNullable(parser.get("targetMaxRelations"))
+ .map(Integer::valueOf)
+ .orElse(MAX_RELS);
+ log.info("targetMaxRelations: {}", targetMaxRelations);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
@@ -116,7 +122,8 @@ public class PrepareRelationsJob {
spark -> {
removeOutputDir(spark, outputPath);
prepareRelationsRDD(
- spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions);
+ spark, inputRelationsPath, outputPath, relationFilter, sourceMaxRelations, targetMaxRelations,
+ relPartitions);
});
}
@@ -129,31 +136,40 @@ public class PrepareRelationsJob {
* @param inputRelationsPath source path for the graph relations
* @param outputPath output path for the processed relations
* @param relationFilter set of relation filters applied to the `relClass` field
- * @param maxRelations maximum number of allowed outgoing edges
+ * @param sourceMaxRelations maximum number of allowed outgoing edges grouping by relation.source
+ * @param targetMaxRelations maximum number of allowed outgoing edges grouping by relation.target
* @param relPartitions number of partitions for the output RDD
*/
private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath,
- Set relationFilter, int maxRelations, int relPartitions) {
+ Set relationFilter, int sourceMaxRelations, int targetMaxRelations, int relPartitions) {
- // group by SOURCE and apply limit
- RDD bySource = readPathRelationRDD(spark, inputRelationsPath)
+ JavaRDD rels = readPathRelationRDD(spark, inputRelationsPath)
.filter(rel -> rel.getDataInfo().getDeletedbyinference() == false)
- .filter(rel -> relationFilter.contains(rel.getRelClass()) == false)
- .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getSource()), r))
+ .filter(rel -> relationFilter.contains(rel.getRelClass()) == false);
+
+ JavaRDD pruned = pruneRels(
+ pruneRels(
+ rels,
+ sourceMaxRelations, relPartitions, (Function) r -> r.getSource()),
+ targetMaxRelations, relPartitions, (Function) r -> r.getTarget());
+ spark
+ .createDataset(pruned.rdd(), Encoders.bean(Relation.class))
+ .repartition(relPartitions)
+ .write()
+ .mode(SaveMode.Overwrite)
+ .parquet(outputPath);
+ }
+
+ private static JavaRDD pruneRels(JavaRDD rels, int maxRelations,
+ int relPartitions, Function idFn) {
+ return rels
+ .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, idFn.call(r)), r))
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
.groupBy(Tuple2::_1)
.map(Tuple2::_2)
.map(t -> Iterables.limit(t, maxRelations))
.flatMap(Iterable::iterator)
- .map(Tuple2::_2)
- .rdd();
-
- spark
- .createDataset(bySource, Encoders.bean(Relation.class))
- .repartition(relPartitions)
- .write()
- .mode(SaveMode.Overwrite)
- .parquet(outputPath);
+ .map(Tuple2::_2);
}
// experimental
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java
new file mode 100644
index 0000000000..9bc3706cdd
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java
@@ -0,0 +1,14 @@
+
+package eu.dnetlib.dhp.oa.provision;
+
+public class ProvisionConstants {
+
+ public static final int MAX_EXTERNAL_ENTITIES = 50;
+ public static final int MAX_AUTHORS = 200;
+ public static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
+ public static final int MAX_TITLE_LENGTH = 5000;
+ public static final int MAX_TITLES = 10;
+ public static final int MAX_ABSTRACT_LENGTH = 100000;
+ public static final int MAX_INSTANCES = 10;
+
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java
index bf7f9330d1..bd7b4d78ee 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java
@@ -16,18 +16,18 @@ public class SortableRelationKey implements Comparable, Ser
private static final Map weights = Maps.newHashMap();
static {
- weights.put("outcome", 0);
- weights.put("supplement", 1);
- weights.put("review", 2);
- weights.put("citation", 3);
- weights.put("affiliation", 4);
- weights.put("relationship", 5);
- weights.put("publicationDataset", 6);
- weights.put("similarity", 7);
+ weights.put("participation", 0);
- weights.put("provision", 8);
- weights.put("participation", 9);
- weights.put("dedup", 10);
+ weights.put("outcome", 1);
+ weights.put("affiliation", 2);
+ weights.put("dedup", 3);
+ weights.put("publicationDataset", 4);
+ weights.put("citation", 5);
+ weights.put("supplement", 6);
+ weights.put("review", 7);
+ weights.put("relationship", 8);
+ weights.put("provision", 9);
+ weights.put("similarity", 10);
}
private static final long serialVersionUID = 3232323;
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json
index 71b2becc4d..33fa1dc8df 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json
@@ -30,9 +30,16 @@
"paramRequired": false
},
{
- "paramName": "mr",
- "paramLongName": "maxRelations",
- "paramDescription": "maximum number of relations allowed for a each entity",
+ "paramName": "smr",
+ "paramLongName": "sourceMaxRelations",
+ "paramDescription": "maximum number of relations allowed for a each entity grouping by source",
+ "paramRequired": false
+ },
+ {
+ "paramName": "tmr",
+ "paramLongName": "targetMaxRelations",
+ "paramDescription": "maximum number of relations allowed for a each entity grouping by target",
"paramRequired": false
}
+
]
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
index faa81ad644..32bf7ce83d 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
@@ -18,8 +18,12 @@
filter applied reading relations (by relClass)
- maxRelations
- maximum number of relations allowed for a each entity
+ sourceMaxRelations
+ maximum number of relations allowed for a each entity grouping by source
+
+
+ targetMaxRelations
+ maximum number of relations allowed for a each entity grouping by target
otherDsTypeId
@@ -133,7 +137,8 @@
--inputRelationsPath${inputGraphRootPath}/relation
--outputPath${workingDir}/relation
- --maxRelations${maxRelations}
+ --sourceMaxRelations${sourceMaxRelations}
+ --targetMaxRelations${targetMaxRelations}
--relationFilter${relationFilter}
--relPartitions5000
@@ -166,7 +171,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
+ --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -193,7 +198,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -220,7 +225,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -247,7 +252,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -274,7 +279,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -301,7 +306,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -328,7 +333,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -367,7 +372,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=15360
+ --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/publication
@@ -395,7 +400,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
+ --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/dataset
@@ -423,7 +428,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
+ --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/otherresearchproduct
@@ -451,7 +456,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/software
@@ -479,7 +484,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
+ --conf spark.sql.shuffle.partitions=8000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/datasource
@@ -507,7 +512,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
+ --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/organization
@@ -535,7 +540,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/project
@@ -607,5 +612,4 @@
-
\ No newline at end of file