diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml
index 1a219c5c91..f98708c648 100644
--- a/dhp-workflows/dhp-broker-events/pom.xml
+++ b/dhp-workflows/dhp-broker-events/pom.xml
@@ -59,7 +59,6 @@
eu.dnetlib.dhp
dnet-openaire-broker-common
- [3.0.0-SNAPSHOT,)
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
index 49e7506981..4a58cfd36d 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
@@ -11,6 +11,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
+import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
public class EventFactory {
@@ -52,9 +54,11 @@ public class EventFactory {
final OaBrokerMainEntity source = updateInfo.getSource();
final OaBrokerMainEntity target = updateInfo.getTarget();
- map.setTargetDatasourceId(target.getCollectedFromId());
- map.setTargetDatasourceName(target.getCollectedFromName());
- map.setTargetDatasourceType(target.getCollectedFromType());
+ final OaBrokerRelatedDatasource targetDs = updateInfo.getTargetDs();
+
+ map.setTargetDatasourceId(targetDs.getOpenaireId());
+ map.setTargetDatasourceName(targetDs.getName());
+ map.setTargetDatasourceType(targetDs.getType());
map.setTargetResultId(target.getOpenaireId());
@@ -73,11 +77,19 @@ public class EventFactory {
// PROVENANCE INFO
map.setTrust(updateInfo.getTrust());
- map.setProvenanceDatasourceId(source.getCollectedFromId());
- map.setProvenanceDatasourceName(source.getCollectedFromName());
- map.setProvenanceDatasourceType(source.getCollectedFromType());
map.setProvenanceResultId(source.getOpenaireId());
+ source
+ .getDatasources()
+ .stream()
+ .filter(ds -> ds.getRelType().equals(BrokerConstants.COLLECTED_FROM_REL))
+ .findFirst()
+ .ifPresent(ds -> {
+ map.setProvenanceDatasourceId(ds.getOpenaireId());
+ map.setProvenanceDatasourceName(ds.getName());
+ map.setProvenanceDatasourceType(ds.getType());
+ });
+
return map;
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java
new file mode 100644
index 0000000000..a51601cd76
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java
@@ -0,0 +1,63 @@
+
+package eu.dnetlib.dhp.broker.oa;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.TypedColumn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.broker.model.Event;
+import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.DatasourceStats;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.StatsAggregator;
+
+public class GenerateStatsJob {
+
+ private static final Logger log = LoggerFactory.getLogger(GenerateStatsJob.class);
+
+ public static void main(final String[] args) throws Exception {
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ IndexOnESJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
+ parser.parseArgument(args);
+
+ final Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final SparkConf conf = new SparkConf();
+
+ final String eventsPath = parser.get("workingPath") + "/events";
+ log.info("eventsPath: {}", eventsPath);
+
+ final String statsPath = parser.get("workingPath") + "/stats";
+ log.info("stats: {}", statsPath);
+
+ final TypedColumn aggr = new StatsAggregator().toColumn();
+
+ runWithSparkSession(conf, isSparkSessionManaged, spark -> {
+
+ final Dataset stats = ClusterUtils
+ .readPath(spark, eventsPath, Event.class)
+ .groupByKey(e -> e.getMap().getTargetDatasourceId(), Encoders.STRING())
+ .agg(aggr)
+ .map(t -> t._2, Encoders.bean(DatasourceStats.class));
+
+ ClusterUtils.save(stats, statsPath, DatasourceStats.class, null);
+ });
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java
index eb1825fa5a..39fa76e43d 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java
@@ -17,8 +17,8 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.AddDatasourceTypeAggregator;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.SimpleDatasourceInfo;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDatasource;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDatasourceAggregator;
import scala.Tuple2;
public class JoinStep0Job {
@@ -45,33 +45,33 @@ public class JoinStep0Job {
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
- final String outputPath = workingPath + "/joinedEntities_step0";
- log.info("outputPath: {}", outputPath);
+ final String joinedEntitiesPath = workingPath + "/joinedEntities_step0";
+ log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
- ClusterUtils.removeDir(spark, outputPath);
+ ClusterUtils.removeDir(spark, joinedEntitiesPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset sources = ClusterUtils
.readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
- final Dataset datasources = ClusterUtils
- .readPath(spark, workingPath + "/datasources", SimpleDatasourceInfo.class);
+ final Dataset typedRels = ClusterUtils
+ .readPath(spark, workingPath + "/relatedDatasources", RelatedDatasource.class);
- final TypedColumn, OaBrokerMainEntity> aggr = new AddDatasourceTypeAggregator()
+ final TypedColumn, OaBrokerMainEntity> aggr = new RelatedDatasourceAggregator()
.toColumn();
final Dataset dataset = sources
- .joinWith(datasources, sources.col("collectedFromId").equalTo(datasources.col("id")), "inner")
+ .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
.groupByKey(t -> t._1.getOpenaireId(), Encoders.STRING())
.agg(aggr)
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
- ClusterUtils.save(dataset, outputPath, OaBrokerMainEntity.class, total);
+ ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total);
});
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java
index cdcf0add4a..55ab497f07 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java
@@ -7,7 +7,6 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.TypedColumn;
@@ -65,9 +64,7 @@ public class JoinStep2Job {
final Dataset dataset = sources
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
- .groupByKey(
- (MapFunction, String>) t -> t._1.getOpenaireId(),
- Encoders.STRING())
+ .groupByKey(t -> t._1.getOpenaireId(), Encoders.STRING())
.agg(aggr)
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java
index 30f5ddac33..166372a7f4 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java
@@ -9,14 +9,23 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
-import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.SimpleDatasourceInfo;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
+import eu.dnetlib.dhp.broker.oa.util.DatasourceRelationsAccumulator;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDatasource;
import eu.dnetlib.dhp.schema.oaf.Datasource;
+import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
+import eu.dnetlib.dhp.schema.oaf.Publication;
+import eu.dnetlib.dhp.schema.oaf.Result;
+import eu.dnetlib.dhp.schema.oaf.Software;
+import scala.Tuple3;
public class PrepareRelatedDatasourcesJob {
@@ -42,7 +51,7 @@ public class PrepareRelatedDatasourcesJob {
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
- final String relsPath = workingPath + "/datasources";
+ final String relsPath = workingPath + "/relatedDatasources";
log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf();
@@ -53,16 +62,46 @@ public class PrepareRelatedDatasourcesJob {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_datasources");
- final Dataset dataset = ClusterUtils
- .readPath(spark, graphPath + "/datasource", Datasource.class)
- .map(
- ds -> new SimpleDatasourceInfo(ds.getId(), ds.getDatasourcetype().getClassid()),
- Encoders.bean(SimpleDatasourceInfo.class));
+ final Dataset> rels = prepareResultTuples(
+ spark, graphPath, Publication.class)
+ .union(prepareResultTuples(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class))
+ .union(prepareResultTuples(spark, graphPath, Software.class))
+ .union(prepareResultTuples(spark, graphPath, OtherResearchProduct.class));
- ClusterUtils.save(dataset, relsPath, SimpleDatasourceInfo.class, total);
+ final Dataset datasources = ClusterUtils
+ .readPath(spark, graphPath + "/datasource", Datasource.class)
+ .map(ConversionUtils::oafDatasourceToBrokerDatasource, Encoders.bean(OaBrokerRelatedDatasource.class));
+
+ final Dataset dataset = rels
+ .joinWith(datasources, datasources.col("openaireId").equalTo(rels.col("_2")), "inner")
+ .map(t -> {
+ final RelatedDatasource r = new RelatedDatasource();
+ r.setSource(t._1._1());
+ r.setRelDatasource(t._2);
+ r.getRelDatasource().setRelType(t._1._3());
+ return r;
+ }, Encoders.bean(RelatedDatasource.class));
+
+ ClusterUtils.save(dataset, relsPath, RelatedDatasource.class, total);
});
}
+ private static final Dataset> prepareResultTuples(final SparkSession spark,
+ final String graphPath,
+ final Class extends Result> sourceClass) {
+
+ return ClusterUtils
+ .readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
+ .filter(r -> !ClusterUtils.isDedupRoot(r.getId()))
+ .filter(r -> r.getDataInfo().getDeletedbyinference())
+ .map(
+ r -> DatasourceRelationsAccumulator.calculateTuples(r),
+ Encoders.bean(DatasourceRelationsAccumulator.class))
+ .flatMap(
+ acc -> acc.getRels().iterator(),
+ Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING()));
+ }
+
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java
index 3d688fa1d1..fba82aa8cc 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java
@@ -15,6 +15,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.spark.util.LongAccumulator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
@@ -34,18 +35,19 @@ public abstract class UpdateMatcher {
this.highlightToStringFunction = highlightToStringFunction;
}
- public Collection> searchUpdatesForRecord(final OaBrokerMainEntity res,
+ public Collection> searchUpdatesForRecord(final OaBrokerMainEntity target,
+ final OaBrokerRelatedDatasource targetDs,
final Collection others,
final Map accumulators) {
final Map> infoMap = new HashMap<>();
for (final OaBrokerMainEntity source : others) {
- if (source != res) {
- for (final T hl : findDifferences(source, res)) {
+ if (source != target) {
+ for (final T hl : findDifferences(source, target)) {
final Topic topic = getTopicFunction().apply(hl);
if (topic != null) {
- final UpdateInfo info = new UpdateInfo<>(topic, hl, source, res,
+ final UpdateInfo info = new UpdateInfo<>(topic, hl, source, target, targetDs,
getCompileHighlightFunction(),
getHighlightToStringFunction());
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java
index 5308b9dffb..7a09862d8c 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java
@@ -14,6 +14,10 @@ public class BrokerConstants {
public static final String OPEN_ACCESS = "OPEN";
public static final String IS_MERGED_IN_CLASS = "isMergedIn";
+ public static final String COLLECTED_FROM_REL = "collectedFrom";
+
+ public static final String HOSTED_BY_REL = "hostedBy";
+
public static final float MIN_TRUST = 0.25f;
public static final float MAX_TRUST = 1.00f;
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java
index d00c5b817d..053627a5fb 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java
@@ -22,11 +22,13 @@ import eu.dnetlib.broker.objects.OaBrokerJournal;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerProject;
import eu.dnetlib.broker.objects.OaBrokerRelatedDataset;
+import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
import eu.dnetlib.broker.objects.OaBrokerTypedValue;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Dataset;
+import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.ExternalReference;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Instance;
@@ -119,8 +121,6 @@ public class ConversionUtils {
res
.setJournal(
result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null);
- res.setCollectedFromId(mappedFirst(result.getCollectedfrom(), KeyValue::getKey));
- res.setCollectedFromName(mappedFirst(result.getCollectedfrom(), KeyValue::getValue));
res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid));
res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
res
@@ -223,6 +223,18 @@ public class ConversionUtils {
return res;
}
+ public static final OaBrokerRelatedDatasource oafDatasourceToBrokerDatasource(final Datasource ds) {
+ if (ds == null) {
+ return null;
+ }
+
+ final OaBrokerRelatedDatasource res = new OaBrokerRelatedDatasource();
+ res.setName(StringUtils.defaultIfBlank(fieldValue(ds.getOfficialname()), fieldValue(ds.getEnglishname())));
+ res.setOpenaireId(ds.getId());
+ res.setType(classId(ds.getDatasourcetype()));
+ return res;
+ }
+
private static String first(final List list) {
return list != null && list.size() > 0 ? list.get(0) : null;
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/DatasourceRelationsAccumulator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/DatasourceRelationsAccumulator.java
new file mode 100644
index 0000000000..75c4625ce4
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/DatasourceRelationsAccumulator.java
@@ -0,0 +1,68 @@
+
+package eu.dnetlib.dhp.broker.oa.util;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+
+import eu.dnetlib.dhp.schema.oaf.Result;
+import scala.Tuple3;
+
+public class DatasourceRelationsAccumulator implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 3256220670651218957L;
+
+ private List> rels = new ArrayList<>();
+
+ public List> getRels() {
+ return rels;
+ }
+
+ public void setRels(final List> rels) {
+ this.rels = rels;
+ }
+
+ protected void addTuple(final Tuple3 t) {
+ rels.add(t);
+ }
+
+ public static final DatasourceRelationsAccumulator calculateTuples(final Result r) {
+
+ final Set collectedFromSet = r
+ .getCollectedfrom()
+ .stream()
+ .map(kv -> kv.getKey())
+ .filter(StringUtils::isNotBlank)
+ .distinct()
+ .collect(Collectors.toSet());
+
+ final Set hostedBySet = r
+ .getInstance()
+ .stream()
+ .map(i -> i.getHostedby())
+ .filter(Objects::nonNull)
+ .filter(kv -> !StringUtils.equalsIgnoreCase(kv.getValue(), "Unknown Repository"))
+ .map(kv -> kv.getKey())
+ .filter(StringUtils::isNotBlank)
+ .distinct()
+ .filter(id -> !collectedFromSet.contains(id))
+ .collect(Collectors.toSet());
+
+ final DatasourceRelationsAccumulator res = new DatasourceRelationsAccumulator();
+ collectedFromSet
+ .stream()
+ .map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.COLLECTED_FROM_REL))
+ .forEach(res::addTuple);
+ hostedBySet.stream().map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.HOSTED_BY_REL)).forEach(res::addTuple);
+ return res;
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java
index b6328eb954..1ab56cc346 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java
@@ -11,6 +11,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.dhp.broker.model.EventFactory;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy;
@@ -80,9 +81,11 @@ public class EventFinder {
final List> list = new ArrayList<>();
for (final OaBrokerMainEntity target : results.getData()) {
- if (verifyTarget(target, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) {
- for (final UpdateMatcher> matcher : matchers) {
- list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), accumulators));
+ for (final OaBrokerRelatedDatasource targetDs : target.getDatasources()) {
+ if (verifyTarget(targetDs, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) {
+ for (final UpdateMatcher> matcher : matchers) {
+ list.addAll(matcher.searchUpdatesForRecord(target, targetDs, results.getData(), accumulators));
+ }
}
}
}
@@ -90,17 +93,17 @@ public class EventFinder {
return asEventGroup(list);
}
- private static boolean verifyTarget(final OaBrokerMainEntity target,
+ private static boolean verifyTarget(final OaBrokerRelatedDatasource target,
final Set dsIdWhitelist,
final Set dsIdBlacklist,
final Set dsTypeWhitelist) {
- if (dsIdWhitelist.contains(target.getCollectedFromId())) {
+ if (dsIdWhitelist.contains(target.getOpenaireId())) {
return true;
- } else if (dsIdBlacklist.contains(target.getCollectedFromId())) {
+ } else if (dsIdBlacklist.contains(target.getOpenaireId())) {
return false;
} else {
- return dsTypeWhitelist.contains(target.getCollectedFromType());
+ return dsTypeWhitelist.contains(target.getType());
}
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
index ef8fb240c4..fca954247d 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
@@ -8,6 +8,7 @@ import eu.dnetlib.broker.objects.OaBrokerEventPayload;
import eu.dnetlib.broker.objects.OaBrokerInstance;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerProvenance;
+import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.dhp.broker.model.Topic;
public final class UpdateInfo {
@@ -20,6 +21,8 @@ public final class UpdateInfo {
private final OaBrokerMainEntity target;
+ private final OaBrokerRelatedDatasource targetDs;
+
private final BiConsumer compileHighlight;
private final Function highlightToString;
@@ -28,12 +31,14 @@ public final class UpdateInfo {
public UpdateInfo(final Topic topic, final T highlightValue, final OaBrokerMainEntity source,
final OaBrokerMainEntity target,
+ final OaBrokerRelatedDatasource targetDs,
final BiConsumer compileHighlight,
final Function highlightToString) {
this.topic = topic;
this.highlightValue = highlightValue;
this.source = source;
this.target = target;
+ this.targetDs = targetDs;
this.compileHighlight = compileHighlight;
this.highlightToString = highlightToString;
this.trust = TrustUtils.calculateTrust(source, target);
@@ -51,6 +56,10 @@ public final class UpdateInfo {
return target;
}
+ public OaBrokerRelatedDatasource getTargetDs() {
+ return targetDs;
+ }
+
protected Topic getTopic() {
return topic;
}
@@ -75,8 +84,20 @@ public final class UpdateInfo {
compileHighlight.accept(hl, getHighlightValue());
final String provId = getSource().getOpenaireId();
- final String provRepo = getSource().getCollectedFromName();
- final String provType = getSource().getCollectedFromType();
+ final String provRepo = getSource()
+ .getDatasources()
+ .stream()
+ .filter(ds -> ds.getRelType().equals(BrokerConstants.COLLECTED_FROM_REL))
+ .map(ds -> ds.getName())
+ .findFirst()
+ .orElse("");
+ final String provType = getSource()
+ .getDatasources()
+ .stream()
+ .filter(ds -> ds.getRelType().equals(BrokerConstants.COLLECTED_FROM_REL))
+ .map(ds -> ds.getType())
+ .findFirst()
+ .orElse("");
final String provUrl = getSource()
.getInstances()
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java
new file mode 100644
index 0000000000..8b628809db
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java
@@ -0,0 +1,61 @@
+
+package eu.dnetlib.dhp.broker.oa.util.aggregators.stats;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DatasourceStats implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -282112564184047677L;
+
+ private String id;
+ private String name;
+ private String type;
+ private Map topics = new HashMap<>();
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(final String id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(final String name) {
+ this.name = name;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(final String type) {
+ this.type = type;
+ }
+
+ public Map getTopics() {
+ return topics;
+ }
+
+ public void setTopics(final Map topics) {
+ this.topics = topics;
+ }
+
+ public void incrementTopic(final String topic, final long inc) {
+ if (topics.containsKey(topic)) {
+ topics.put(topic, topics.get(topic) + inc);
+ } else {
+ topics.put(topic, inc);
+ }
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java
new file mode 100644
index 0000000000..5aa6698e39
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java
@@ -0,0 +1,59 @@
+
+package eu.dnetlib.dhp.broker.oa.util.aggregators.stats;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.expressions.Aggregator;
+
+import eu.dnetlib.dhp.broker.model.Event;
+
+public class StatsAggregator extends Aggregator {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 6652105853037330529L;
+
+ @Override
+ public DatasourceStats zero() {
+ return new DatasourceStats();
+ }
+
+ @Override
+ public DatasourceStats reduce(final DatasourceStats stats, final Event e) {
+ stats.setId(e.getMap().getTargetDatasourceId());
+ stats.setName(e.getMap().getTargetDatasourceName());
+ stats.setType(e.getMap().getTargetDatasourceType());
+ stats.incrementTopic(e.getTopic(), 1l);
+ return stats;
+ }
+
+ @Override
+ public DatasourceStats merge(final DatasourceStats stats0, final DatasourceStats stats1) {
+ if (StringUtils.isBlank(stats0.getId())) {
+ stats0.setId(stats1.getId());
+ stats0.setName(stats1.getName());
+ stats0.setType(stats1.getType());
+ }
+ stats1.getTopics().entrySet().forEach(e -> stats0.incrementTopic(e.getKey(), e.getValue()));
+ return stats0;
+ }
+
+ @Override
+ public Encoder bufferEncoder() {
+ return Encoders.bean(DatasourceStats.class);
+
+ }
+
+ @Override
+ public DatasourceStats finish(final DatasourceStats stats) {
+ return stats;
+ }
+
+ @Override
+ public Encoder outputEncoder() {
+ return Encoders.bean(DatasourceStats.class);
+
+ }
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasource.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasource.java
new file mode 100644
index 0000000000..a27df502bf
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasource.java
@@ -0,0 +1,42 @@
+
+package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
+
+import java.io.Serializable;
+
+import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
+
+public class RelatedDatasource implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 3015550240920424010L;
+
+ private String source;
+ private OaBrokerRelatedDatasource relDatasource;
+
+ public RelatedDatasource() {
+ }
+
+ public RelatedDatasource(final String source, final OaBrokerRelatedDatasource relDatasource) {
+ this.source = source;
+ this.relDatasource = relDatasource;
+ }
+
+ public String getSource() {
+ return source;
+ }
+
+ public void setSource(final String source) {
+ this.source = source;
+ }
+
+ public OaBrokerRelatedDatasource getRelDatasource() {
+ return relDatasource;
+ }
+
+ public void setRelDatasource(final OaBrokerRelatedDatasource relDatasource) {
+ this.relDatasource = relDatasource;
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/AddDatasourceTypeAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasourceAggregator.java
similarity index 55%
rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/AddDatasourceTypeAggregator.java
rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasourceAggregator.java
index ccd15c8c6e..2c0c7917d9 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/AddDatasourceTypeAggregator.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasourceAggregator.java
@@ -7,15 +7,16 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import scala.Tuple2;
-public class AddDatasourceTypeAggregator
- extends Aggregator, OaBrokerMainEntity, OaBrokerMainEntity> {
+public class RelatedDatasourceAggregator
+ extends Aggregator, OaBrokerMainEntity, OaBrokerMainEntity> {
/**
*
*/
- private static final long serialVersionUID = 8788588975496014728L;
+ private static final long serialVersionUID = -7212121913834713672L;
@Override
public OaBrokerMainEntity zero() {
@@ -29,10 +30,10 @@ public class AddDatasourceTypeAggregator
@Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g,
- final Tuple2 t) {
+ final Tuple2 t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
- if (t._2 != null && StringUtils.isNotBlank(t._2.getType())) {
- res.setCollectedFromType(t._2.getType());
+ if (t._2 != null && res.getDatasources().size() < BrokerConstants.MAX_NUMBER_OF_RELS) {
+ res.getDatasources().add(t._2.getRelDatasource());
}
return res;
@@ -40,7 +41,15 @@ public class AddDatasourceTypeAggregator
@Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
- if (StringUtils.isNotBlank(g1.getOpenaireId()) && StringUtils.isNotBlank(g1.getCollectedFromType())) {
+ if (StringUtils.isNotBlank(g1.getOpenaireId())) {
+ final int availables = BrokerConstants.MAX_NUMBER_OF_RELS - g1.getDatasources().size();
+ if (availables > 0) {
+ if (g2.getDatasources().size() <= availables) {
+ g1.getDatasources().addAll(g2.getDatasources());
+ } else {
+ g1.getDatasources().addAll(g2.getDatasources().subList(0, availables));
+ }
+ }
return g1;
} else {
return g2;
@@ -56,4 +65,5 @@ public class AddDatasourceTypeAggregator
public Encoder outputEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
+
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/SimpleDatasourceInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/SimpleDatasourceInfo.java
deleted file mode 100644
index 966f63fa0f..0000000000
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/SimpleDatasourceInfo.java
+++ /dev/null
@@ -1,40 +0,0 @@
-
-package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
-
-import java.io.Serializable;
-
-public class SimpleDatasourceInfo implements Serializable {
-
- /**
- *
- */
- private static final long serialVersionUID = 2996609859416024734L;
-
- private String id;
- private String type;
-
- public SimpleDatasourceInfo() {
- }
-
- public SimpleDatasourceInfo(final String id, final String type) {
- this.id = id;
- this.type = type;
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(final String id) {
- this.id = id;
- }
-
- public String getType() {
- return type;
- }
-
- public void setType(final String type) {
- this.type = type;
- }
-
-}
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml
index 2c728cd981..568d5dc5a0 100644
--- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml
@@ -448,6 +448,30 @@
--index${esIndexName}
--esHost${esIndexHost}
+
+
+
+
+
+
+ yarn
+ cluster
+ GenerateStatsJob
+ eu.dnetlib.dhp.broker.oa.GenerateStatsJob
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --graphPath${graphInputPath}
+ --workingPath${workingPath}
+
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml
index d19ad6c5a0..2271a9e0e9 100644
--- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml
@@ -64,19 +64,214 @@
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
-
+
+
yarn
cluster
- Count
- eu.dnetlib.dhp.broker.oa.CheckDuplictedIdsJob
+ JoinStep0
+ eu.dnetlib.dhp.broker.oa.JoinStep0Job
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --graphPath${graphInputPath}
+ --workingPath${workingPath}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ JoinStep1
+ eu.dnetlib.dhp.broker.oa.JoinStep1Job
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --graphPath${graphInputPath}
+ --workingPath${workingPath}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ JoinStep2
+ eu.dnetlib.dhp.broker.oa.JoinStep2Job
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --graphPath${graphInputPath}
+ --workingPath${workingPath}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ JoinStep3
+ eu.dnetlib.dhp.broker.oa.JoinStep3Job
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --graphPath${graphInputPath}
+ --workingPath${workingPath}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ JoinStep4
+ eu.dnetlib.dhp.broker.oa.JoinStep4Job
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --graphPath${graphInputPath}
+ --workingPath${workingPath}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ PrepareGroupsJob
+ eu.dnetlib.dhp.broker.oa.PrepareGroupsJob
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --graphPath${graphInputPath}
+ --workingPath${workingPath}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ GenerateEventsJob
+ eu.dnetlib.dhp.broker.oa.GenerateEventsJob
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --workingPath${workingPath}
+ --datasourceIdWhitelist${datasourceIdWhitelist}
+ --datasourceTypeWhitelist${datasourceTypeWhitelist}
+ --datasourceIdBlacklist${datasourceIdBlacklist}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ IndexOnESJob
+ eu.dnetlib.dhp.broker.oa.IndexOnESJob
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.dynamicAllocation.maxExecutors="8"
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --workingPath${workingPath}
+ --index${esIndexName}
+ --esHost${esIndexHost}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ GenerateStatsJob
+ eu.dnetlib.dhp.broker.oa.GenerateStatsJob
dhp-broker-events-${projectVersion}.jar
--executor-cores=${sparkExecutorCores}
diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java
index 82374b335f..8fa95abe5f 100644
--- a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java
+++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java
@@ -8,15 +8,23 @@ import java.util.Collection;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
+@ExtendWith(MockitoExtension.class)
class UpdateMatcherTest {
UpdateMatcher matcher = new EnrichMissingPublicationDate();
+ @Mock
+ private OaBrokerRelatedDatasource targetDs;
+
@BeforeEach
void setUp() throws Exception {
}
@@ -30,7 +38,7 @@ class UpdateMatcherTest {
final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
final Collection> list = matcher
- .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
+ .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty());
}
@@ -46,7 +54,7 @@ class UpdateMatcherTest {
res.setPublicationdate("2018");
final Collection> list = matcher
- .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
+ .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty());
}
@@ -62,7 +70,7 @@ class UpdateMatcherTest {
p2.setPublicationdate("2018");
final Collection> list = matcher
- .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
+ .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.size() == 1);
}
@@ -79,7 +87,7 @@ class UpdateMatcherTest {
p2.setPublicationdate("2018");
final Collection> list = matcher
- .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
+ .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty());
}
@@ -98,7 +106,7 @@ class UpdateMatcherTest {
p4.setPublicationdate("2018");
final Collection> list = matcher
- .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
+ .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty());
}
@@ -117,7 +125,7 @@ class UpdateMatcherTest {
p4.setPublicationdate("2018");
final Collection> list = matcher
- .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null);
+ .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.size() == 1);
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java
index 2120da0808..74cecb7b6b 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java
@@ -28,6 +28,8 @@ import eu.dnetlib.pace.config.DedupConfig;
abstract class AbstractSparkAction implements Serializable {
+ protected static final int NUM_PARTITIONS = 1000;
+
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java
index 222794d64d..01065510ae 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java
@@ -100,6 +100,11 @@ public class DedupUtility {
return String.format("%s/%s/%s_mergerel", basePath, actionSetId, entityType);
}
+ public static String createBlockStatsPath(
+ final String basePath, final String actionSetId, final String entityType) {
+ return String.format("%s/%s/%s_blockstats", basePath, actionSetId, entityType);
+ }
+
public static List getConfigurations(String isLookUpUrl, String orchestrator)
throws ISLookUpException, DocumentException {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl);
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java
index 180f9f8460..68201677e5 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java
@@ -52,6 +52,7 @@ public class Deduper implements Serializable {
.collect(Collectors.toList())
.iterator())
.mapToPair(block -> new Tuple2<>(block.getKey(), block))
- .reduceByKey((b1, b2) -> Block.from(b1, b2, of, maxQueueSize));
+ .reduceByKey((b1, b2) -> Block.from(b1, b2, of, maxQueueSize))
+ .filter(b -> b._2().getDocuments().size() > 1);
}
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java
new file mode 100644
index 0000000000..6fb7b844bb
--- /dev/null
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java
@@ -0,0 +1,57 @@
+
+package eu.dnetlib.dhp.oa.dedup;
+
+import java.util.Objects;
+
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.expressions.Aggregator;
+
+import eu.dnetlib.dhp.schema.oaf.Relation;
+
+public class RelationAggregator extends Aggregator {
+
+ private static Relation ZERO = new Relation();
+
+ @Override
+ public Relation zero() {
+ return ZERO;
+ }
+
+ @Override
+ public Relation reduce(Relation b, Relation a) {
+ return mergeRel(b, a);
+ }
+
+ @Override
+ public Relation merge(Relation b, Relation a) {
+ return mergeRel(b, a);
+ }
+
+ @Override
+ public Relation finish(Relation r) {
+ return r;
+ }
+
+ private Relation mergeRel(Relation b, Relation a) {
+ if (Objects.equals(b, ZERO)) {
+ return a;
+ }
+ if (Objects.equals(a, ZERO)) {
+ return b;
+ }
+
+ b.mergeFrom(a);
+ return b;
+ }
+
+ @Override
+ public Encoder bufferEncoder() {
+ return Encoders.kryo(Relation.class);
+ }
+
+ @Override
+ public Encoder outputEncoder() {
+ return Encoders.kryo(Relation.class);
+ }
+}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java
new file mode 100644
index 0000000000..1e13485e5a
--- /dev/null
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkBlockStats.java
@@ -0,0 +1,126 @@
+
+package eu.dnetlib.dhp.oa.dedup;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FilterFunction;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.dom4j.DocumentException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.oa.dedup.model.Block;
+import eu.dnetlib.dhp.oa.dedup.model.BlockStats;
+import eu.dnetlib.dhp.utils.ISLookupClientFactory;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import eu.dnetlib.pace.config.DedupConfig;
+import eu.dnetlib.pace.model.MapDocument;
+import eu.dnetlib.pace.util.MapDocumentUtil;
+import scala.Tuple2;
+
+public class SparkBlockStats extends AbstractSparkAction {
+
+ private static final Logger log = LoggerFactory.getLogger(SparkBlockStats.class);
+
+ public SparkBlockStats(ArgumentApplicationParser parser, SparkSession spark) {
+ super(parser, spark);
+ }
+
+ public static void main(String[] args) throws Exception {
+ ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ SparkBlockStats.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json")));
+ parser.parseArgument(args);
+
+ SparkConf conf = new SparkConf();
+
+ new SparkBlockStats(parser, getSparkSession(conf))
+ .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
+ }
+
+ public Long computeComparisons(Long blockSize, Long slidingWindowSize) {
+
+ if (slidingWindowSize >= blockSize)
+ return (slidingWindowSize * (slidingWindowSize - 1)) / 2;
+ else {
+ return (blockSize - slidingWindowSize + 1) * (slidingWindowSize * (slidingWindowSize - 1)) / 2;
+ }
+ }
+
+ @Override
+ public void run(ISLookUpService isLookUpService)
+ throws DocumentException, IOException, ISLookUpException {
+
+ // read oozie parameters
+ final String graphBasePath = parser.get("graphBasePath");
+ final String isLookUpUrl = parser.get("isLookUpUrl");
+ final String actionSetId = parser.get("actionSetId");
+ final String workingPath = parser.get("workingPath");
+ final int numPartitions = Optional
+ .ofNullable(parser.get("numPartitions"))
+ .map(Integer::valueOf)
+ .orElse(NUM_PARTITIONS);
+
+ log.info("graphBasePath: '{}'", graphBasePath);
+ log.info("isLookUpUrl: '{}'", isLookUpUrl);
+ log.info("actionSetId: '{}'", actionSetId);
+ log.info("workingPath: '{}'", workingPath);
+
+ // for each dedup configuration
+ for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) {
+
+ final String subEntity = dedupConf.getWf().getSubEntityValue();
+ log.info("Creating blockstats for: '{}'", subEntity);
+
+ final String outputPath = DedupUtility.createBlockStatsPath(workingPath, actionSetId, subEntity);
+ removeOutputDir(spark, outputPath);
+
+ JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ JavaPairRDD mapDocuments = sc
+ .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
+ .repartition(numPartitions)
+ .mapToPair(
+ (PairFunction) s -> {
+ MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
+ return new Tuple2<>(d.getIdentifier(), d);
+ });
+
+ // create blocks for deduplication
+ JavaRDD blockStats = Deduper
+ .createSortedBlocks(mapDocuments, dedupConf)
+ .repartition(numPartitions)
+ .map(b -> asBlockStats(dedupConf, b));
+
+ // save the blockstats in the workingdir
+ spark
+ .createDataset(blockStats.rdd(), Encoders.bean(BlockStats.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .save(outputPath);
+ }
+ }
+
+ private BlockStats asBlockStats(DedupConfig dedupConf, Tuple2 b) {
+ return new BlockStats(
+ b._1(),
+ (long) b._2().getDocuments().size(),
+ computeComparisons(
+ (long) b._2().getDocuments().size(), (long) dedupConf.getWf().getSlidingWindowSize()));
+ }
+
+}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java
index c0503d991d..6d625cd117 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java
@@ -5,11 +5,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD;
@@ -75,7 +77,11 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final String workingPath = parser.get("workingPath");
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
-
+ int cut = Optional
+ .ofNullable(parser.get("cutConnectedComponent"))
+ .map(Integer::valueOf)
+ .orElse(0);
+ log.info("connected component cut: '{}'", cut);
log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId);
@@ -100,8 +106,10 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final RDD> edgeRdd = spark
.read()
- .load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity))
- .as(Encoders.bean(Relation.class))
+ .textFile(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity))
+ .map(
+ (MapFunction) r -> OBJECT_MAPPER.readValue(r, Relation.class),
+ Encoders.bean(Relation.class))
.javaRDD()
.map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass()))
.rdd();
@@ -109,7 +117,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final Dataset mergeRels = spark
.createDataset(
GraphProcessor
- .findCCs(vertexes.rdd(), edgeRdd, maxIterations)
+ .findCCs(vertexes.rdd(), edgeRdd, maxIterations, cut)
.toJavaRDD()
.filter(k -> k.getDocIds().size() > 1)
.flatMap(cc -> ccToMergeRel(cc, dedupConf))
@@ -117,6 +125,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
Encoders.bean(Relation.class));
mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath);
+
}
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java
index 2cfe2e0801..b3ee47bfc2 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java
@@ -2,6 +2,7 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
+import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
@@ -48,13 +49,6 @@ public class SparkCreateSimRels extends AbstractSparkAction {
parser.parseArgument(args);
SparkConf conf = new SparkConf();
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- conf
- .registerKryoClasses(
- new Class[] {
- MapDocument.class, FieldListImpl.class, FieldValueImpl.class, Block.class
- });
-
new SparkCreateSimRels(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
@@ -68,7 +62,12 @@ public class SparkCreateSimRels extends AbstractSparkAction {
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
+ final int numPartitions = Optional
+ .ofNullable(parser.get("numPartitions"))
+ .map(Integer::valueOf)
+ .orElse(NUM_PARTITIONS);
+ log.info("numPartitions: '{}'", numPartitions);
log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId);
@@ -88,6 +87,7 @@ public class SparkCreateSimRels extends AbstractSparkAction {
JavaPairRDD mapDocuments = sc
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
+ .repartition(numPartitions)
.mapToPair(
(PairFunction) s -> {
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
@@ -95,19 +95,17 @@ public class SparkCreateSimRels extends AbstractSparkAction {
});
// create blocks for deduplication
- JavaPairRDD blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf);
+ JavaPairRDD blocks = Deduper
+ .createSortedBlocks(mapDocuments, dedupConf)
+ .repartition(numPartitions);
// create relations by comparing only elements in the same group
- JavaRDD relations = Deduper
+ Deduper
.computeRelations(sc, blocks, dedupConf)
- .map(t -> createSimRel(t._1(), t._2(), entity));
-
- // save the simrel in the workingdir
- spark
- .createDataset(relations.rdd(), Encoders.bean(Relation.class))
- .write()
- .mode(SaveMode.Append)
- .save(outputPath);
+ .map(t -> createSimRel(t._1(), t._2(), entity))
+ .repartition(numPartitions)
+ .map(r -> OBJECT_MAPPER.writeValueAsString(r))
+ .saveAsTextFile(outputPath);
}
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java
index 5168085116..03e6674e4d 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java
@@ -4,12 +4,16 @@ package eu.dnetlib.dhp.oa.dedup;
import static org.apache.spark.sql.functions.col;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Joiner;
+
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
@@ -95,7 +99,24 @@ public class SparkPropagateRelation extends AbstractSparkAction {
FieldType.TARGET,
getDeletedFn());
- save(newRels.union(updated).union(mergeRels), outputRelationPath, SaveMode.Overwrite);
+ save(
+ distinctRelations(
+ newRels
+ .union(updated)
+ .union(mergeRels)
+ .map((MapFunction) r -> r, Encoders.kryo(Relation.class))),
+ outputRelationPath, SaveMode.Overwrite);
+ }
+
+ private Dataset distinctRelations(Dataset rels) {
+ return rels
+ .filter(getRelationFilterFunction())
+ .groupByKey(
+ (MapFunction) r -> String
+ .join(r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()),
+ Encoders.STRING())
+ .agg(new RelationAggregator().toColumn())
+ .map((MapFunction, Relation>) t -> t._2(), Encoders.bean(Relation.class));
}
private static Dataset processDataset(
@@ -112,6 +133,14 @@ public class SparkPropagateRelation extends AbstractSparkAction {
.map(mapFn, Encoders.bean(Relation.class));
}
+ private FilterFunction getRelationFilterFunction() {
+ return (FilterFunction) r -> StringUtils.isNotBlank(r.getSource()) ||
+ StringUtils.isNotBlank(r.getTarget()) ||
+ StringUtils.isNotBlank(r.getRelClass()) ||
+ StringUtils.isNotBlank(r.getSubRelType()) ||
+ StringUtils.isNotBlank(r.getRelClass());
+ }
+
private static MapFunction patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java
index bfd2c25e21..cd4f99f634 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java
@@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.dedup.graph;
import java.io.IOException;
import java.io.Serializable;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.codehaus.jackson.annotate.JsonIgnore;
@@ -18,12 +19,17 @@ public class ConnectedComponent implements Serializable {
private Set docIds;
private String ccId;
- public ConnectedComponent() {
- }
-
- public ConnectedComponent(Set docIds) {
+ public ConnectedComponent(Set docIds, final int cut) {
this.docIds = docIds;
createID();
+ if (cut > 0 && docIds.size() > cut) {
+ this.docIds = docIds
+ .stream()
+ .filter(s -> !ccId.equalsIgnoreCase(s))
+ .limit(cut - 1)
+ .collect(Collectors.toSet());
+ this.docIds.add(ccId);
+ }
}
public String createID() {
@@ -41,6 +47,7 @@ public class ConnectedComponent implements Serializable {
public String getMin() {
final StringBuilder min = new StringBuilder();
+
docIds
.forEach(
i -> {
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala
index e19bb7ff58..f4dd85d758 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala
@@ -7,7 +7,7 @@ import scala.collection.JavaConversions;
object GraphProcessor {
- def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int): RDD[ConnectedComponent] = {
+ def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = {
val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
val cc = graph.connectedComponents(maxIterations).vertices
@@ -22,15 +22,15 @@ object GraphProcessor {
}
}
val connectedComponents = joinResult.groupByKey()
- .map[ConnectedComponent](cc => asConnectedComponent(cc))
+ .map[ConnectedComponent](cc => asConnectedComponent(cc, cut))
connectedComponents
}
- def asConnectedComponent(group: (VertexId, Iterable[String])): ConnectedComponent = {
+ def asConnectedComponent(group: (VertexId, Iterable[String]), cut:Int): ConnectedComponent = {
val docs = group._2.toSet[String]
- val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs));
+ val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs), cut);
connectedComponent
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/BlockStats.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/BlockStats.java
new file mode 100644
index 0000000000..0ec8c26999
--- /dev/null
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/BlockStats.java
@@ -0,0 +1,45 @@
+
+package eu.dnetlib.dhp.oa.dedup.model;
+
+import java.io.Serializable;
+
+public class BlockStats implements Serializable {
+
+ private String key; // key of the block
+ private Long size; // number of elements in the block
+ private Long comparisons; // number of comparisons in the block
+
+ public BlockStats() {
+ }
+
+ public BlockStats(String key, Long size, Long comparisons) {
+ this.key = key;
+ this.size = size;
+ this.comparisons = comparisons;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public Long getSize() {
+ return size;
+ }
+
+ public void setSize(Long size) {
+ this.size = size;
+ }
+
+ public Long getComparisons() {
+ return comparisons;
+ }
+
+ public void setComparisons(Long comparisons) {
+ this.comparisons = comparisons;
+ }
+
+}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json
new file mode 100644
index 0000000000..09f4365d34
--- /dev/null
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json
@@ -0,0 +1,32 @@
+[
+ {
+ "paramName": "la",
+ "paramLongName": "isLookUpUrl",
+ "paramDescription": "address for the LookUp",
+ "paramRequired": true
+ },
+ {
+ "paramName": "asi",
+ "paramLongName": "actionSetId",
+ "paramDescription": "action set identifier (name of the orchestrator)",
+ "paramRequired": true
+ },
+ {
+ "paramName": "i",
+ "paramLongName": "graphBasePath",
+ "paramDescription": "the base path of the raw graph",
+ "paramRequired": true
+ },
+ {
+ "paramName": "w",
+ "paramLongName": "workingPath",
+ "paramDescription": "path of the working directory",
+ "paramRequired": true
+ },
+ {
+ "paramName": "np",
+ "paramLongName": "numPartitions",
+ "paramDescription": "number of partitions for the similarity relations intermediate phases",
+ "paramRequired": false
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json
index 6eedd54321..b1df08535e 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json
@@ -17,6 +17,12 @@
"paramDescription": "the url for the lookup service",
"paramRequired": true
},
+ {
+ "paramName": "cc",
+ "paramLongName": "cutConnectedComponent",
+ "paramDescription": "the number of maximum elements that belongs to a connected components",
+ "paramRequired": false
+ },
{
"paramName": "w",
"paramLongName": "workingPath",
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json
index ce38dc6f00..09f4365d34 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json
@@ -22,5 +22,11 @@
"paramLongName": "workingPath",
"paramDescription": "path of the working directory",
"paramRequired": true
+ },
+ {
+ "paramName": "np",
+ "paramLongName": "numPartitions",
+ "paramDescription": "number of partitions for the similarity relations intermediate phases",
+ "paramRequired": false
}
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml
index 298a248e3b..c42ce12639 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml
@@ -20,6 +20,10 @@
dedupGraphPath
path for the output graph
+
+ cutConnectedComponent
+ max number of elements in a connected component
+
sparkDriverMemory
memory for driver process
@@ -106,10 +110,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
- --i${graphBasePath}
- --la${isLookUpUrl}
- --asi${actionSetId}
- --w${workingPath}
+ --graphBasePath${graphBasePath}
+ --isLookUpUrl${isLookUpUrl}
+ --actionSetId${actionSetId}
+ --workingPath${workingPath}
+ --numPartitions8000
@@ -132,10 +137,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
- --i${graphBasePath}
- --w${workingPath}
- --la${isLookUpUrl}
- --asi${actionSetId}
+ --graphBasePath${graphBasePath}
+ --workingPath${workingPath}
+ --isLookUpUrl${isLookUpUrl}
+ --actionSetId${actionSetId}
+ --cutConnectedComponent${cutConnectedComponent}
@@ -158,10 +164,10 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
- --i${graphBasePath}
- --w${workingPath}
- --la${isLookUpUrl}
- --asi${actionSetId}
+ --graphBasePath${graphBasePath}
+ --workingPath${workingPath}
+ --isLookUpUrl${isLookUpUrl}
+ --actionSetId${actionSetId}
@@ -184,9 +190,9 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
- --i${graphBasePath}
- --w${workingPath}
- --o${dedupGraphPath}
+ --graphBasePath${graphBasePath}
+ --workingPath${workingPath}
+ --dedupGraphPath${dedupGraphPath}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/statistics/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/statistics/oozie_app/config-default.xml
new file mode 100644
index 0000000000..2e0ed9aeea
--- /dev/null
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/statistics/oozie_app/config-default.xml
@@ -0,0 +1,18 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/statistics/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/statistics/oozie_app/workflow.xml
new file mode 100644
index 0000000000..c0080b028f
--- /dev/null
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/statistics/oozie_app/workflow.xml
@@ -0,0 +1,108 @@
+
+
+
+ graphBasePath
+ the raw graph base path
+
+
+ isLookUpUrl
+ the address of the lookUp service
+
+
+ actionSetId
+ id of the actionSet
+
+
+ numPartitions
+ number of partitions for the similarity relations intermediate phases
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+ oozieActionShareLibForSpark2
+ oozie action sharelib for spark 2.*
+
+
+ spark2ExtraListeners
+ com.cloudera.spark.lineage.NavigatorAppListener
+ spark 2.* extra listeners classname
+
+
+ spark2SqlQueryExecutionListeners
+ com.cloudera.spark.lineage.NavigatorQueryListener
+ spark 2.* sql query execution listeners classname
+
+
+ spark2YarnHistoryServerAddress
+ spark 2.* yarn history server address
+
+
+ spark2EventLogDir
+ spark 2.* event log dir location
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ mapreduce.job.queuename
+ ${queueName}
+
+
+ oozie.launcher.mapred.job.queue.name
+ ${oozieLauncherQueueName}
+
+
+ oozie.action.sharelib.for.spark
+ ${oozieActionShareLibForSpark2}
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ yarn
+ cluster
+ Create deduplication blocks
+ eu.dnetlib.dhp.oa.dedup.SparkBlockStats
+ dhp-dedup-openaire-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --graphBasePath${graphBasePath}
+ --isLookUpUrl${isLookUpUrl}
+ --actionSetId${actionSetId}
+ --workingPath${workingDir}
+ --numPartitions${numPartitions}
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json
index c91f3c04bc..6a2a48746b 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json
@@ -1,17 +1,17 @@
[
-{
- "paramName": "i",
- "paramLongName": "graphBasePath",
- "paramDescription": "the base path of raw graph",
- "paramRequired": true
-},
-{
- "paramName": "w",
- "paramLongName": "workingPath",
- "paramDescription": "the working directory path",
- "paramRequired": true
-},
-{
+ {
+ "paramName": "i",
+ "paramLongName": "graphBasePath",
+ "paramDescription": "the base path of raw graph",
+ "paramRequired": true
+ },
+ {
+ "paramName": "w",
+ "paramLongName": "workingPath",
+ "paramDescription": "the working directory path",
+ "paramRequired": true
+ },
+ {
"paramName": "o",
"paramLongName": "dedupGraphPath",
"paramDescription": "the path of the dedup graph",
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java
index 513e14f073..3fb9d17512 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java
@@ -47,12 +47,13 @@ public class EntityMergerTest implements Serializable {
@Test
public void softwareMergerTest() throws InstantiationException, IllegalAccessException {
- List> softwares = readSample(testEntityBasePath + "/software_merge.json", Software.class);
+ List> softwares = readSample(
+ testEntityBasePath + "/software_merge.json", Software.class);
Software merged = DedupRecordFactory
- .entityMerger(dedupId, softwares.iterator(), 0, dataInfo, Software.class);
+ .entityMerger(dedupId, softwares.iterator(), 0, dataInfo, Software.class);
- System.out.println(merged.getBestaccessright().getClassid());
+ assertEquals(merged.getBestaccessright().getClassid(), "OPEN SOURCE");
}
@Test
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java
index 88d5f24f90..fb5ebc0993 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java
@@ -3,6 +3,8 @@ package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient;
@@ -11,6 +13,9 @@ import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
@@ -18,6 +23,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
@@ -71,11 +77,13 @@ public class SparkDedupTest implements Serializable {
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
+ final SparkConf conf = new SparkConf();
+ conf.set("spark.sql.shuffle.partitions", "200");
spark = SparkSession
.builder()
.appName(SparkDedupTest.class.getSimpleName())
.master("local[*]")
- .config(new SparkConf())
+ .config(conf)
.getOrCreate();
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@@ -152,37 +160,42 @@ public class SparkDedupTest implements Serializable {
parser
.parseArgument(
new String[] {
- "-i",
- testGraphBasePath,
- "-asi",
- testActionSetId,
- "-la",
- "lookupurl",
- "-w",
- testOutputBasePath
+ "-i", testGraphBasePath,
+ "-asi", testActionSetId,
+ "-la", "lookupurl",
+ "-w", testOutputBasePath,
+ "-np", "50"
});
new SparkCreateSimRels(parser, spark).run(isLookUpService);
long orgs_simrel = spark
.read()
- .load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
+ .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
.count();
+
long pubs_simrel = spark
.read()
- .load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
+ .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
.count();
- long sw_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/software_simrel").count();
- long ds_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel").count();
+ long sw_simrel = spark
+ .read()
+ .textFile(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
+ .count();
+
+ long ds_simrel = spark
+ .read()
+ .textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
+ .count();
long orp_simrel = spark
.read()
- .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
+ .textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
.count();
assertEquals(3432, orgs_simrel);
- assertEquals(7054, pubs_simrel);
+ assertEquals(7152, pubs_simrel);
assertEquals(344, sw_simrel);
assertEquals(458, ds_simrel);
assertEquals(6750, orp_simrel);
@@ -190,6 +203,101 @@ public class SparkDedupTest implements Serializable {
@Test
@Order(2)
+ public void cutMergeRelsTest() throws Exception {
+
+ ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ SparkCreateMergeRels.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
+ parser
+ .parseArgument(
+ new String[] {
+ "-i",
+ testGraphBasePath,
+ "-asi",
+ testActionSetId,
+ "-la",
+ "lookupurl",
+ "-w",
+ testOutputBasePath,
+ "-cc",
+ "3"
+ });
+
+ new SparkCreateMergeRels(parser, spark).run(isLookUpService);
+
+ long orgs_mergerel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
+ .as(Encoders.bean(Relation.class))
+ .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges"))
+ .groupBy("source")
+ .agg(count("target").alias("cnt"))
+ .select("source", "cnt")
+ .where("cnt > 3")
+ .count();
+
+ long pubs_mergerel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")
+ .as(Encoders.bean(Relation.class))
+ .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges"))
+ .groupBy("source")
+ .agg(count("target").alias("cnt"))
+ .select("source", "cnt")
+ .where("cnt > 3")
+ .count();
+ long sw_mergerel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
+ .as(Encoders.bean(Relation.class))
+ .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges"))
+ .groupBy("source")
+ .agg(count("target").alias("cnt"))
+ .select("source", "cnt")
+ .where("cnt > 3")
+ .count();
+
+ long ds_mergerel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
+ .as(Encoders.bean(Relation.class))
+ .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges"))
+ .groupBy("source")
+ .agg(count("target").alias("cnt"))
+ .select("source", "cnt")
+ .where("cnt > 3")
+ .count();
+
+ long orp_mergerel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")
+ .as(Encoders.bean(Relation.class))
+ .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges"))
+ .groupBy("source")
+ .agg(count("target").alias("cnt"))
+ .select("source", "cnt")
+ .where("cnt > 3")
+ .count();
+
+ assertEquals(0, orgs_mergerel);
+ assertEquals(0, pubs_mergerel);
+ assertEquals(0, sw_mergerel);
+ assertEquals(0, ds_mergerel);
+ assertEquals(0, orp_mergerel);
+
+ FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel"));
+ FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel"));
+ FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/software_mergerel"));
+ FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel"));
+ FileUtils
+ .deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel"));
+ }
+
+ @Test
+ @Order(3)
public void createMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@@ -225,8 +333,10 @@ public class SparkDedupTest implements Serializable {
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
.count();
-
- long ds_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel").count();
+ long ds_mergerel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
+ .count();
long orp_mergerel = spark
.read()
@@ -234,14 +344,14 @@ public class SparkDedupTest implements Serializable {
.count();
assertEquals(1276, orgs_mergerel);
- assertEquals(1440, pubs_mergerel);
+ assertEquals(1442, pubs_mergerel);
assertEquals(288, sw_mergerel);
assertEquals(472, ds_mergerel);
assertEquals(718, orp_mergerel);
}
@Test
- @Order(3)
+ @Order(4)
public void createDedupRecordTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@@ -288,7 +398,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
- @Order(4)
+ @Order(5)
public void updateEntityTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@@ -404,7 +514,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
- @Order(5)
+ @Order(6)
public void propagateRelationTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@@ -423,7 +533,7 @@ public class SparkDedupTest implements Serializable {
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
- assertEquals(4971, relations);
+ assertEquals(4866, relations);
// check deletedbyinference
final Dataset mergeRels = spark
@@ -454,7 +564,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
- @Order(6)
+ @Order(7)
public void testRelations() throws Exception {
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10);
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkStatsTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkStatsTest.java
new file mode 100644
index 0000000000..7e76c284b7
--- /dev/null
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkStatsTest.java
@@ -0,0 +1,177 @@
+
+package eu.dnetlib.dhp.oa.dedup;
+
+import static java.nio.file.Files.createTempDirectory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.lenient;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+
+@ExtendWith(MockitoExtension.class)
+public class SparkStatsTest implements Serializable {
+
+ @Mock(serializable = true)
+ ISLookUpService isLookUpService;
+
+ private static SparkSession spark;
+ private static JavaSparkContext jsc;
+
+ private static String testGraphBasePath;
+ private static String testOutputBasePath;
+ private static final String testActionSetId = "test-orchestrator";
+
+ @BeforeAll
+ public static void cleanUp() throws IOException, URISyntaxException {
+
+ testGraphBasePath = Paths
+ .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI())
+ .toFile()
+ .getAbsolutePath();
+ testOutputBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-")
+ .toAbsolutePath()
+ .toString();
+
+ FileUtils.deleteDirectory(new File(testOutputBasePath));
+
+ final SparkConf conf = new SparkConf();
+ conf.set("spark.sql.shuffle.partitions", "200");
+ spark = SparkSession
+ .builder()
+ .appName(SparkDedupTest.class.getSimpleName())
+ .master("local[*]")
+ .config(conf)
+ .getOrCreate();
+
+ jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+ }
+
+ @BeforeEach
+ public void setUp() throws IOException, ISLookUpException {
+
+ lenient()
+ .when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId)))
+ .thenReturn(
+ IOUtils
+ .toString(
+ SparkDedupTest.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml")));
+
+ lenient()
+ .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization")))
+ .thenReturn(
+ IOUtils
+ .toString(
+ SparkDedupTest.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
+
+ lenient()
+ .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication")))
+ .thenReturn(
+ IOUtils
+ .toString(
+ SparkDedupTest.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")));
+
+ lenient()
+ .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software")))
+ .thenReturn(
+ IOUtils
+ .toString(
+ SparkDedupTest.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json")));
+
+ lenient()
+ .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("dataset")))
+ .thenReturn(
+ IOUtils
+ .toString(
+ SparkDedupTest.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json")));
+
+ lenient()
+ .when(isLookUpService.getResourceProfileByQuery(Mockito.contains("otherresearchproduct")))
+ .thenReturn(
+ IOUtils
+ .toString(
+ SparkDedupTest.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json")));
+ }
+
+ @Test
+ public void createBlockStatsTest() throws Exception {
+
+ ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ SparkCreateSimRels.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/dedup/createBlockStats_parameters.json")));
+ parser
+ .parseArgument(
+ new String[] {
+ "-i", testGraphBasePath,
+ "-asi", testActionSetId,
+ "-la", "lookupurl",
+ "-w", testOutputBasePath
+ });
+
+ new SparkBlockStats(parser, spark).run(isLookUpService);
+
+ long orgs_blocks = spark
+ .read()
+ .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_blockstats")
+ .count();
+
+ long pubs_blocks = spark
+ .read()
+ .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_blockstats")
+ .count();
+
+ long sw_blocks = spark
+ .read()
+ .textFile(testOutputBasePath + "/" + testActionSetId + "/software_blockstats")
+ .count();
+
+ long ds_blocks = spark
+ .read()
+ .textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_blockstats")
+ .count();
+
+ long orp_blocks = spark
+ .read()
+ .textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_blockstats")
+ .count();
+
+ assertEquals(121, orgs_blocks);
+ assertEquals(110, pubs_blocks);
+ assertEquals(21, sw_blocks);
+ assertEquals(67, ds_blocks);
+ assertEquals(55, orp_blocks);
+ }
+}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json
index 2469b2cc03..fa889d63b7 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json
@@ -6,10 +6,10 @@
"subEntityType" : "resulttype",
"subEntityValue" : "dataset",
"orderField" : "title",
- "queueMaxSize" : "800",
+ "queueMaxSize" : "100",
"groupMaxSize" : "100",
"maxChildren" : "100",
- "slidingWindowSize" : "80",
+ "slidingWindowSize" : "100",
"rootBuilder" : ["result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true",
"idPath" : "$.id",
@@ -17,7 +17,8 @@
},
"pace" : {
"clustering" : [
- { "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } },
+ { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
+ { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree" : {
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json
index 4adcc0439f..b45b6ae832 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json
@@ -6,10 +6,10 @@
"subEntityType" : "resulttype",
"subEntityValue" : "otherresearchproduct",
"orderField" : "title",
- "queueMaxSize" : "800",
+ "queueMaxSize" : "100",
"groupMaxSize" : "100",
"maxChildren" : "100",
- "slidingWindowSize" : "80",
+ "slidingWindowSize" : "100",
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true",
"idPath" : "$.id",
@@ -17,7 +17,8 @@
},
"pace" : {
"clustering" : [
- { "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } },
+ { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
+ { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree" : {
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json
index ef0b26af41..15ebc7a6a0 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json
@@ -6,10 +6,10 @@
"subEntityType": "resulttype",
"subEntityValue": "publication",
"orderField": "title",
- "queueMaxSize": "800",
+ "queueMaxSize": "100",
"groupMaxSize": "100",
"maxChildren": "100",
- "slidingWindowSize": "80",
+ "slidingWindowSize": "100",
"rootBuilder": [
"result",
"resultProject_outcome_isProducedBy",
@@ -29,7 +29,8 @@
},
"pace": {
"clustering" : [
- { "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } },
+ { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
+ { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree": {
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json
index 623abbf9f4..f53ff385f9 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json
@@ -6,10 +6,10 @@
"subEntityType" : "resulttype",
"subEntityValue" : "software",
"orderField" : "title",
- "queueMaxSize" : "800",
+ "queueMaxSize" : "100",
"groupMaxSize" : "100",
"maxChildren" : "100",
- "slidingWindowSize" : "80",
+ "slidingWindowSize" : "100",
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true",
"idPath" : "$.id",
@@ -17,8 +17,9 @@
},
"pace" : {
"clustering" : [
- { "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } },
- { "name" : "lowercase", "fields" : [ "doi", "url" ], "params" : { } }
+ { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
+ { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
+ { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree": {
"start": {
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java
index fd707e949a..7091d97403 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java
@@ -8,7 +8,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
-import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
@@ -24,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
@@ -151,7 +151,8 @@ public class CleanGraphSparkJob {
if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) {
Qualifier bestaccessrights = AbstractMdRecordToOafMapper.createBestAccessRights(r.getInstance());
if (Objects.isNull(bestaccessrights)) {
- r.setBestaccessright(
+ r
+ .setBestaccessright(
qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
} else {
r.setBestaccessright(bestaccessrights);
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml
index 575f9229ed..eea8d0a5ab 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml
@@ -16,6 +16,11 @@
postgresPassword
the password postgres
+
+ dbSchema
+ beta
+ the database schema according to the D-Net infrastructure (beta or production)
+
isLookupUrl
the address of the lookUp service
@@ -93,6 +98,7 @@
--postgresUser${postgresUser}
--postgresPassword${postgresPassword}
--isLookupUrl${isLookupUrl}
+ --dbschema${dbSchema}
@@ -109,6 +115,7 @@
--postgresUser${postgresUser}
--postgresPassword${postgresPassword}
--isLookupUrl${isLookupUrl}
+ --dbschema${dbSchema}
--actionclaims
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java
index 80b8000173..b08e593f73 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java
@@ -9,6 +9,7 @@ import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
@@ -118,10 +119,7 @@ public class CreateRelatedEntitiesJob_phase1 {
Dataset> entities = readPathEntity(spark, inputEntityPath, clazz)
.filter("dataInfo.invisible == false")
.map(
- (MapFunction) value -> asRelatedEntity(value, clazz),
- Encoders.kryo(RelatedEntity.class))
- .map(
- (MapFunction>) e -> new Tuple2<>(e.getId(), e),
+ (MapFunction>) e -> new Tuple2<>(e.getId(), asRelatedEntity(e, clazz)),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class)))
.cache();
@@ -165,13 +163,24 @@ public class CreateRelatedEntitiesJob_phase1 {
Result result = (Result) entity;
if (result.getTitle() != null && !result.getTitle().isEmpty()) {
- re.setTitle(result.getTitle().stream().findFirst().get());
+ final StructuredProperty title = result.getTitle().stream().findFirst().get();
+ title.setValue(StringUtils.left(title.getValue(), ProvisionConstants.MAX_TITLE_LENGTH));
+ re.setTitle(title);
}
re.setDateofacceptance(getValue(result.getDateofacceptance()));
re.setPublisher(getValue(result.getPublisher()));
re.setResulttype(result.getResulttype());
- re.setInstances(result.getInstance());
+ if (Objects.nonNull(result.getInstance())) {
+ re
+ .setInstances(
+ result
+ .getInstance()
+ .stream()
+ .filter(Objects::nonNull)
+ .limit(ProvisionConstants.MAX_INSTANCES)
+ .collect(Collectors.toList()));
+ }
// TODO still to be mapped
// re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java
index bfcc648a35..7e175121e5 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java
@@ -61,12 +61,6 @@ public class CreateRelatedEntitiesJob_phase2 {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final int MAX_EXTERNAL_ENTITIES = 50;
- private static final int MAX_AUTHORS = 200;
- private static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
- private static final int MAX_TITLE_LENGTH = 5000;
- private static final int MAX_ABSTRACT_LENGTH = 100000;
-
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
@@ -246,15 +240,15 @@ public class CreateRelatedEntitiesJob_phase2 {
List refs = r
.getExternalReference()
.stream()
- .limit(MAX_EXTERNAL_ENTITIES)
+ .limit(ProvisionConstants.MAX_EXTERNAL_ENTITIES)
.collect(Collectors.toList());
r.setExternalReference(refs);
}
if (r.getAuthor() != null) {
List authors = Lists.newArrayList();
for (Author a : r.getAuthor()) {
- a.setFullname(StringUtils.left(a.getFullname(), MAX_AUTHOR_FULLNAME_LENGTH));
- if (authors.size() < MAX_AUTHORS || hasORCID(a)) {
+ a.setFullname(StringUtils.left(a.getFullname(), ProvisionConstants.MAX_AUTHOR_FULLNAME_LENGTH));
+ if (authors.size() < ProvisionConstants.MAX_AUTHORS || hasORCID(a)) {
authors.add(a);
}
}
@@ -266,7 +260,7 @@ public class CreateRelatedEntitiesJob_phase2 {
.stream()
.filter(Objects::nonNull)
.map(d -> {
- d.setValue(StringUtils.left(d.getValue(), MAX_ABSTRACT_LENGTH));
+ d.setValue(StringUtils.left(d.getValue(), ProvisionConstants.MAX_ABSTRACT_LENGTH));
return d;
})
.collect(Collectors.toList());
@@ -278,9 +272,10 @@ public class CreateRelatedEntitiesJob_phase2 {
.stream()
.filter(Objects::nonNull)
.map(t -> {
- t.setValue(StringUtils.left(t.getValue(), MAX_TITLE_LENGTH));
+ t.setValue(StringUtils.left(t.getValue(), ProvisionConstants.MAX_TITLE_LENGTH));
return t;
})
+ .limit(ProvisionConstants.MAX_TITLES)
.collect(Collectors.toList());
r.setTitle(titles);
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java
index eb63d4423d..da0a810217 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java
@@ -100,11 +100,17 @@ public class PrepareRelationsJob {
.orElse(new HashSet<>());
log.info("relationFilter: {}", relationFilter);
- int maxRelations = Optional
- .ofNullable(parser.get("maxRelations"))
+ int sourceMaxRelations = Optional
+ .ofNullable(parser.get("sourceMaxRelations"))
.map(Integer::valueOf)
.orElse(MAX_RELS);
- log.info("maxRelations: {}", maxRelations);
+ log.info("sourceMaxRelations: {}", sourceMaxRelations);
+
+ int targetMaxRelations = Optional
+ .ofNullable(parser.get("targetMaxRelations"))
+ .map(Integer::valueOf)
+ .orElse(MAX_RELS);
+ log.info("targetMaxRelations: {}", targetMaxRelations);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
@@ -116,7 +122,8 @@ public class PrepareRelationsJob {
spark -> {
removeOutputDir(spark, outputPath);
prepareRelationsRDD(
- spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions);
+ spark, inputRelationsPath, outputPath, relationFilter, sourceMaxRelations, targetMaxRelations,
+ relPartitions);
});
}
@@ -129,31 +136,40 @@ public class PrepareRelationsJob {
* @param inputRelationsPath source path for the graph relations
* @param outputPath output path for the processed relations
* @param relationFilter set of relation filters applied to the `relClass` field
- * @param maxRelations maximum number of allowed outgoing edges
+ * @param sourceMaxRelations maximum number of allowed outgoing edges grouping by relation.source
+ * @param targetMaxRelations maximum number of allowed outgoing edges grouping by relation.target
* @param relPartitions number of partitions for the output RDD
*/
private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath,
- Set relationFilter, int maxRelations, int relPartitions) {
+ Set relationFilter, int sourceMaxRelations, int targetMaxRelations, int relPartitions) {
- // group by SOURCE and apply limit
- RDD bySource = readPathRelationRDD(spark, inputRelationsPath)
+ JavaRDD rels = readPathRelationRDD(spark, inputRelationsPath)
.filter(rel -> rel.getDataInfo().getDeletedbyinference() == false)
- .filter(rel -> relationFilter.contains(rel.getRelClass()) == false)
- .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getSource()), r))
+ .filter(rel -> relationFilter.contains(rel.getRelClass()) == false);
+
+ JavaRDD pruned = pruneRels(
+ pruneRels(
+ rels,
+ sourceMaxRelations, relPartitions, (Function) r -> r.getSource()),
+ targetMaxRelations, relPartitions, (Function) r -> r.getTarget());
+ spark
+ .createDataset(pruned.rdd(), Encoders.bean(Relation.class))
+ .repartition(relPartitions)
+ .write()
+ .mode(SaveMode.Overwrite)
+ .parquet(outputPath);
+ }
+
+ private static JavaRDD pruneRels(JavaRDD rels, int maxRelations,
+ int relPartitions, Function idFn) {
+ return rels
+ .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, idFn.call(r)), r))
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
.groupBy(Tuple2::_1)
.map(Tuple2::_2)
.map(t -> Iterables.limit(t, maxRelations))
.flatMap(Iterable::iterator)
- .map(Tuple2::_2)
- .rdd();
-
- spark
- .createDataset(bySource, Encoders.bean(Relation.class))
- .repartition(relPartitions)
- .write()
- .mode(SaveMode.Overwrite)
- .parquet(outputPath);
+ .map(Tuple2::_2);
}
// experimental
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java
new file mode 100644
index 0000000000..9bc3706cdd
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java
@@ -0,0 +1,14 @@
+
+package eu.dnetlib.dhp.oa.provision;
+
+public class ProvisionConstants {
+
+ public static final int MAX_EXTERNAL_ENTITIES = 50;
+ public static final int MAX_AUTHORS = 200;
+ public static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
+ public static final int MAX_TITLE_LENGTH = 5000;
+ public static final int MAX_TITLES = 10;
+ public static final int MAX_ABSTRACT_LENGTH = 100000;
+ public static final int MAX_INSTANCES = 10;
+
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java
index bf7f9330d1..bd7b4d78ee 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java
@@ -16,18 +16,18 @@ public class SortableRelationKey implements Comparable, Ser
private static final Map weights = Maps.newHashMap();
static {
- weights.put("outcome", 0);
- weights.put("supplement", 1);
- weights.put("review", 2);
- weights.put("citation", 3);
- weights.put("affiliation", 4);
- weights.put("relationship", 5);
- weights.put("publicationDataset", 6);
- weights.put("similarity", 7);
+ weights.put("participation", 0);
- weights.put("provision", 8);
- weights.put("participation", 9);
- weights.put("dedup", 10);
+ weights.put("outcome", 1);
+ weights.put("affiliation", 2);
+ weights.put("dedup", 3);
+ weights.put("publicationDataset", 4);
+ weights.put("citation", 5);
+ weights.put("supplement", 6);
+ weights.put("review", 7);
+ weights.put("relationship", 8);
+ weights.put("provision", 9);
+ weights.put("similarity", 10);
}
private static final long serialVersionUID = 3232323;
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java
index db9a68d3da..53d4c888ea 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java
@@ -329,7 +329,7 @@ public class XmlRecordFactory implements Serializable {
.stream()
.filter(Objects::nonNull)
.map(c -> XmlSerializationUtils.asXmlElement("description", c.getValue()))
- .collect(Collectors.toList()));
+ .collect(Collectors.toCollection(HashSet::new)));
}
if (r.getEmbargoenddate() != null) {
metadata
@@ -370,7 +370,7 @@ public class XmlRecordFactory implements Serializable {
.stream()
.filter(Objects::nonNull)
.map(c -> XmlSerializationUtils.asXmlElement("source", c.getValue()))
- .collect(Collectors.toList()));
+ .collect(Collectors.toCollection(HashSet::new)));
}
if (r.getFormat() != null) {
metadata
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json
index 71b2becc4d..33fa1dc8df 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json
@@ -30,9 +30,16 @@
"paramRequired": false
},
{
- "paramName": "mr",
- "paramLongName": "maxRelations",
- "paramDescription": "maximum number of relations allowed for a each entity",
+ "paramName": "smr",
+ "paramLongName": "sourceMaxRelations",
+ "paramDescription": "maximum number of relations allowed for a each entity grouping by source",
+ "paramRequired": false
+ },
+ {
+ "paramName": "tmr",
+ "paramLongName": "targetMaxRelations",
+ "paramDescription": "maximum number of relations allowed for a each entity grouping by target",
"paramRequired": false
}
+
]
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
index faa81ad644..91ced378ca 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
@@ -18,8 +18,12 @@
filter applied reading relations (by relClass)
- maxRelations
- maximum number of relations allowed for a each entity
+ sourceMaxRelations
+ maximum number of relations allowed for a each entity grouping by source
+
+
+ targetMaxRelations
+ maximum number of relations allowed for a each entity grouping by target
otherDsTypeId
@@ -133,7 +137,8 @@
--inputRelationsPath${inputGraphRootPath}/relation
--outputPath${workingDir}/relation
- --maxRelations${maxRelations}
+ --sourceMaxRelations${sourceMaxRelations}
+ --targetMaxRelations${targetMaxRelations}
--relationFilter${relationFilter}
--relPartitions5000
@@ -166,7 +171,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
+ --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -193,7 +198,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -220,7 +225,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -247,7 +252,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -274,7 +279,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -301,7 +306,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -328,7 +333,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -367,7 +372,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=15360
+ --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/publication
@@ -395,7 +400,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
+ --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/dataset
@@ -423,7 +428,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
+ --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/otherresearchproduct
@@ -451,7 +456,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/software
@@ -479,7 +484,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
+ --conf spark.sql.shuffle.partitions=8000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/datasource
@@ -507,7 +512,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
+ --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/organization
@@ -535,7 +540,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/project
@@ -573,10 +578,18 @@
--isLookupUrl${isLookupUrl}
--otherDsTypeId${otherDsTypeId}
-
+
+
+
+ ${wf:conf('shouldIndex') eq 'true'}
+ ${wf:conf('shouldIndex') eq 'false'}
+
+
+
+
yarn
@@ -607,5 +620,4 @@
-
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 4619f31749..5ece9aaf68 100644
--- a/pom.xml
+++ b/pom.xml
@@ -324,6 +324,12 @@
+ eu.dnetlib.dhp
+ dnet-openaire-broker-common
+ ${dnet.openaire.broker.common}
+
+
+
org.apache.cxf
cxf-rt-transports-http
3.1.5
@@ -618,5 +624,6 @@
3.3.3
3.4.2
[2.12,3.0)
+ 3.1.0