diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml
index e3182c259..119031b06 100644
--- a/dhp-workflows/dhp-broker-events/pom.xml
+++ b/dhp-workflows/dhp-broker-events/pom.xml
@@ -59,7 +59,7 @@
eu.dnetlib
dnet-openaire-broker-common
- [3.0.3,4.0.0)
+ [3.0.4,4.0.0)
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
index 315a054d3..49e750698 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
@@ -42,6 +42,7 @@ public class EventFactory {
res.setCreationDate(now);
res.setExpiryDate(calculateExpiryDate(now));
res.setInstantMessage(false);
+
return res;
}
@@ -53,6 +54,7 @@ public class EventFactory {
map.setTargetDatasourceId(target.getCollectedFromId());
map.setTargetDatasourceName(target.getCollectedFromName());
+ map.setTargetDatasourceType(target.getCollectedFromType());
map.setTargetResultId(target.getOpenaireId());
@@ -73,6 +75,7 @@ public class EventFactory {
map.setTrust(updateInfo.getTrust());
map.setProvenanceDatasourceId(source.getCollectedFromId());
map.setProvenanceDatasourceName(source.getCollectedFromName());
+ map.setProvenanceDatasourceType(source.getCollectedFromType());
map.setProvenanceResultId(source.getOpenaireId());
return map;
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MappedFields.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MappedFields.java
index 4b0ed171b..2c1be3ba4 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MappedFields.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MappedFields.java
@@ -13,6 +13,7 @@ public class MappedFields implements Serializable {
private String targetDatasourceId;
private String targetDatasourceName;
+ private String targetDatasourceType;
private String targetResultId;
private String targetResultTitle;
private long targetDateofacceptance;
@@ -21,6 +22,7 @@ public class MappedFields implements Serializable {
private float trust;
private String provenanceDatasourceId;
private String provenanceDatasourceName;
+ private String provenanceDatasourceType;
private String provenanceResultId;
public String getTargetDatasourceId() {
@@ -39,6 +41,14 @@ public class MappedFields implements Serializable {
this.targetDatasourceName = targetDatasourceName;
}
+ public String getTargetDatasourceType() {
+ return targetDatasourceType;
+ }
+
+ public void setTargetDatasourceType(final String targetDatasourceType) {
+ this.targetDatasourceType = targetDatasourceType;
+ }
+
public String getTargetResultId() {
return targetResultId;
}
@@ -103,6 +113,14 @@ public class MappedFields implements Serializable {
this.provenanceDatasourceName = provenanceDatasourceName;
}
+ public String getProvenanceDatasourceType() {
+ return provenanceDatasourceType;
+ }
+
+ public void setProvenanceDatasourceType(final String provenanceDatasourceType) {
+ this.provenanceDatasourceType = provenanceDatasourceType;
+ }
+
public String getProvenanceResultId() {
return provenanceResultId;
}
@@ -111,4 +129,8 @@ public class MappedFields implements Serializable {
this.provenanceResultId = provenanceResultId;
}
+ public static long getSerialversionuid() {
+ return serialVersionUID;
+ }
+
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java
index d6ac71429..5d3121aed 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java
@@ -3,11 +3,15 @@ package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
@@ -59,6 +63,15 @@ public class GenerateEventsJob {
final String eventsPath = workingPath + "/events";
log.info("eventsPath: {}", eventsPath);
+ final Set dsIdWhitelist = parseParamAsList(parser, "datasourceIdWhitelist");
+ log.info("datasourceIdWhitelist: {}", StringUtils.join(dsIdWhitelist, ","));
+
+ final Set dsTypeWhitelist = parseParamAsList(parser, "datasourceTypeWhitelist");
+ log.info("datasourceTypeWhitelist: {}", StringUtils.join(dsTypeWhitelist, ","));
+
+ final Set dsIdBlacklist = parseParamAsList(parser, "datasourceIdBlacklist");
+ log.info("datasourceIdBlacklist: {}", StringUtils.join(dsIdBlacklist, ","));
+
final SparkConf conf = new SparkConf();
// TODO UNCOMMENT
@@ -77,9 +90,12 @@ public class GenerateEventsJob {
.readPath(spark, workingPath + "/duplicates", ResultGroup.class);
final Dataset dataset = groups
- .map(g -> EventFinder.generateEvents(g, dedupConfig, accumulators), Encoders.bean(EventGroup.class))
- .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class))
- .map(e -> ClusterUtils.incrementAccumulator(e, total), Encoders.bean(Event.class));
+ .map(
+ g -> EventFinder
+ .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, dedupConfig, accumulators),
+ Encoders
+ .bean(EventGroup.class))
+ .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class));
ClusterUtils.save(dataset, eventsPath, Event.class, total);
@@ -87,6 +103,22 @@ public class GenerateEventsJob {
}
+ private static Set parseParamAsList(final ArgumentApplicationParser parser, final String key) {
+ final String s = parser.get(key).trim();
+
+ final Set res = new HashSet<>();
+
+ if (s.length() > 1) { // A value of a single char (for example: '-') indicates an empty list
+ Arrays
+ .stream(s.split(","))
+ .map(String::trim)
+ .filter(StringUtils::isNotBlank)
+ .forEach(res::add);
+ }
+
+ return res;
+ }
+
public static Map prepareAccumulators(final SparkContext sc) {
return EventFinder
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java
index 36d0ffd1b..9124d18e3 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java
@@ -48,6 +48,7 @@ public class IndexOnESJob {
final JavaRDD inputRdd = ClusterUtils
.readPath(spark, eventsPath, Event.class)
+ .limit(10000) // TODO REMOVE
.map(IndexOnESJob::eventAsJsonString, Encoders.STRING())
.javaRDD();
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java
new file mode 100644
index 000000000..eb1825fa5
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java
@@ -0,0 +1,80 @@
+
+package eu.dnetlib.dhp.broker.oa;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.TypedColumn;
+import org.apache.spark.util.LongAccumulator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.AddDatasourceTypeAggregator;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.SimpleDatasourceInfo;
+import scala.Tuple2;
+
+public class JoinStep0Job {
+
+ private static final Logger log = LoggerFactory.getLogger(JoinStep0Job.class);
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ JoinStep0Job.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
+ parser.parseArgument(args);
+
+ final Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String graphPath = parser.get("graphPath");
+ log.info("graphPath: {}", graphPath);
+
+ final String workingPath = parser.get("workingPath");
+ log.info("workingPath: {}", workingPath);
+
+ final String outputPath = workingPath + "/joinedEntities_step0";
+ log.info("outputPath: {}", outputPath);
+
+ final SparkConf conf = new SparkConf();
+
+ runWithSparkSession(conf, isSparkSessionManaged, spark -> {
+
+ ClusterUtils.removeDir(spark, outputPath);
+
+ final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
+
+ final Dataset sources = ClusterUtils
+ .readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
+
+ final Dataset datasources = ClusterUtils
+ .readPath(spark, workingPath + "/datasources", SimpleDatasourceInfo.class);
+
+ final TypedColumn, OaBrokerMainEntity> aggr = new AddDatasourceTypeAggregator()
+ .toColumn();
+
+ final Dataset dataset = sources
+ .joinWith(datasources, sources.col("collectedFromId").equalTo(datasources.col("id")), "inner")
+ .groupByKey(t -> t._1.getOpenaireId(), Encoders.STRING())
+ .agg(aggr)
+ .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
+
+ ClusterUtils.save(dataset, outputPath, OaBrokerMainEntity.class, total);
+
+ });
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java
index f9bf2d146..8e502f736 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java
@@ -55,7 +55,7 @@ public class JoinStep1Job {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset sources = ClusterUtils
- .readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
+ .readPath(spark, workingPath + "/joinedEntities_step0", OaBrokerMainEntity.class);
final Dataset typedRels = ClusterUtils
.readPath(spark, workingPath + "/relatedProjects", RelatedProject.class);
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java
new file mode 100644
index 000000000..30f5ddac3
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java
@@ -0,0 +1,68 @@
+
+package eu.dnetlib.dhp.broker.oa;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.util.LongAccumulator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.SimpleDatasourceInfo;
+import eu.dnetlib.dhp.schema.oaf.Datasource;
+
+public class PrepareRelatedDatasourcesJob {
+
+ private static final Logger log = LoggerFactory.getLogger(PrepareRelatedDatasourcesJob.class);
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ PrepareRelatedDatasourcesJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
+ parser.parseArgument(args);
+
+ final Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String graphPath = parser.get("graphPath");
+ log.info("graphPath: {}", graphPath);
+
+ final String workingPath = parser.get("workingPath");
+ log.info("workingPath: {}", workingPath);
+
+ final String relsPath = workingPath + "/datasources";
+ log.info("relsPath: {}", relsPath);
+
+ final SparkConf conf = new SparkConf();
+
+ runWithSparkSession(conf, isSparkSessionManaged, spark -> {
+
+ ClusterUtils.removeDir(spark, relsPath);
+
+ final LongAccumulator total = spark.sparkContext().longAccumulator("total_datasources");
+
+ final Dataset dataset = ClusterUtils
+ .readPath(spark, graphPath + "/datasource", Datasource.class)
+ .map(
+ ds -> new SimpleDatasourceInfo(ds.getId(), ds.getDatasourcetype().getClassid()),
+ Encoders.bean(SimpleDatasourceInfo.class));
+
+ ClusterUtils.save(dataset, relsPath, SimpleDatasourceInfo.class, total);
+
+ });
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java
index 5ed55247b..e7abae68b 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java
@@ -4,8 +4,11 @@ package eu.dnetlib.dhp.broker.oa.util;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.spark.util.LongAccumulator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.EventFactory;
@@ -38,6 +41,8 @@ import eu.dnetlib.pace.config.DedupConfig;
public class EventFinder {
+ private static final Logger log = LoggerFactory.getLogger(EventFinder.class);
+
private static final List> matchers = new ArrayList<>();
static {
matchers.add(new EnrichMissingAbstract());
@@ -69,19 +74,39 @@ public class EventFinder {
}
public static EventGroup generateEvents(final ResultGroup results,
+ final Set dsIdWhitelist,
+ final Set dsIdBlacklist,
+ final Set dsTypeWhitelist,
final DedupConfig dedupConfig,
final Map accumulators) {
+
final List> list = new ArrayList<>();
for (final OaBrokerMainEntity target : results.getData()) {
- for (final UpdateMatcher> matcher : matchers) {
- list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig, accumulators));
+ if (verifyTarget(target, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) {
+ for (final UpdateMatcher> matcher : matchers) {
+ list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig, accumulators));
+ }
}
}
return asEventGroup(list);
}
+ private static boolean verifyTarget(final OaBrokerMainEntity target,
+ final Set dsIdWhitelist,
+ final Set dsIdBlacklist,
+ final Set dsTypeWhitelist) {
+
+ if (dsIdWhitelist.contains(target.getCollectedFromId())) {
+ return true;
+ } else if (dsIdBlacklist.contains(target.getCollectedFromId())) {
+ return false;
+ } else {
+ return dsTypeWhitelist.contains(target.getCollectedFromType());
+ }
+ }
+
private static EventGroup asEventGroup(final List> list) {
final EventGroup events = new EventGroup();
list.stream().map(EventFactory::newBrokerEvent).forEach(events::addElement);
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
index 048683b50..0586b681e 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
@@ -113,6 +113,7 @@ public final class UpdateInfo {
final String provId = getSource().getOpenaireId();
final String provRepo = getSource().getCollectedFromName();
+ final String provType = getSource().getCollectedFromType();
final String provUrl = getSource()
.getInstances()
@@ -122,7 +123,7 @@ public final class UpdateInfo {
.orElse(null);
;
- final OaBrokerProvenance provenance = new OaBrokerProvenance(provId, provRepo, provUrl);
+ final OaBrokerProvenance provenance = new OaBrokerProvenance(provId, provRepo, provType, provUrl);
final OaBrokerEventPayload res = new OaBrokerEventPayload();
res.setResult(target);
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/AddDatasourceTypeAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/AddDatasourceTypeAggregator.java
new file mode 100644
index 000000000..ccd15c8c6
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/AddDatasourceTypeAggregator.java
@@ -0,0 +1,59 @@
+
+package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.expressions.Aggregator;
+
+import eu.dnetlib.broker.objects.OaBrokerMainEntity;
+import scala.Tuple2;
+
+public class AddDatasourceTypeAggregator
+ extends Aggregator, OaBrokerMainEntity, OaBrokerMainEntity> {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 8788588975496014728L;
+
+ @Override
+ public OaBrokerMainEntity zero() {
+ return new OaBrokerMainEntity();
+ }
+
+ @Override
+ public OaBrokerMainEntity finish(final OaBrokerMainEntity g) {
+ return g;
+ }
+
+ @Override
+ public OaBrokerMainEntity reduce(final OaBrokerMainEntity g,
+ final Tuple2 t) {
+ final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
+ if (t._2 != null && StringUtils.isNotBlank(t._2.getType())) {
+ res.setCollectedFromType(t._2.getType());
+ }
+ return res;
+
+ }
+
+ @Override
+ public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
+ if (StringUtils.isNotBlank(g1.getOpenaireId()) && StringUtils.isNotBlank(g1.getCollectedFromType())) {
+ return g1;
+ } else {
+ return g2;
+ }
+ }
+
+ @Override
+ public Encoder bufferEncoder() {
+ return Encoders.bean(OaBrokerMainEntity.class);
+ }
+
+ @Override
+ public Encoder outputEncoder() {
+ return Encoders.bean(OaBrokerMainEntity.class);
+ }
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/SimpleDatasourceInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/SimpleDatasourceInfo.java
new file mode 100644
index 000000000..966f63fa0
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/SimpleDatasourceInfo.java
@@ -0,0 +1,40 @@
+
+package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
+
+import java.io.Serializable;
+
+public class SimpleDatasourceInfo implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 2996609859416024734L;
+
+ private String id;
+ private String type;
+
+ public SimpleDatasourceInfo() {
+ }
+
+ public SimpleDatasourceInfo(final String id, final String type) {
+ this.id = id;
+ this.type = type;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(final String id) {
+ this.id = id;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(final String type) {
+ this.type = type;
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml
index 7667bfba7..a0c7b00db 100644
--- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml
@@ -16,6 +16,21 @@
dedupConfProfId
the id of a valid Dedup Configuration Profile
+
+
+ datasourceIdWhitelist
+ -
+ a white list (comma separeted, - for empty list) of datasource ids
+
+
+ datasourceTypeWhitelist
+ -
+ a white list (comma separeted, - for empty list) of datasource types
+
+
+ datasourceIdBlacklist
+ -
+ a black list (comma separeted, - for empty list) of datasource ids
esIndexName
@@ -96,6 +111,7 @@
+
@@ -125,6 +141,30 @@
+
+
+
+ yarn
+ cluster
+ PrepareRelatedDatasourcesJob
+ eu.dnetlib.dhp.broker.oa.PrepareRelatedDatasourcesJob
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --graphPath${graphInputPath}
+ --workingPath${workingPath}
+
+
+
+
@@ -223,7 +263,31 @@
-
+
+
+
+
+ yarn
+ cluster
+ JoinStep0
+ eu.dnetlib.dhp.broker.oa.JoinStep0Job
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --graphPath${graphInputPath}
+ --workingPath${workingPath}
+
+
+
+
@@ -365,6 +429,9 @@
--workingPath${workingPath}
--isLookupUrl${isLookupUrl}
--dedupConfProfile${dedupConfProfId}
+ --datasourceIdWhitelist${datasourceIdWhitelist}
+ --datasourceTypeWhitelist${datasourceTypeWhitelist}
+ --datasourceIdBlacklist${datasourceIdBlacklist}
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json
index 7ae076159..c545884f9 100644
--- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json
@@ -16,5 +16,23 @@
"paramLongName": "dedupConfProfile",
"paramDescription": "the id of a valid Dedup Configuration Profile",
"paramRequired": true
+ },
+ {
+ "paramName": "datasourceIdWhitelist",
+ "paramLongName": "datasourceIdWhitelist",
+ "paramDescription": "a white list (comma separeted, - for empty list) of datasource ids",
+ "paramRequired": true
+ },
+ {
+ "paramName": "datasourceTypeWhitelist",
+ "paramLongName": "datasourceTypeWhitelist",
+ "paramDescription": "a white list (comma separeted, - for empty list) of datasource types",
+ "paramRequired": true
+ },
+ {
+ "paramName": "datasourceIdBlacklist",
+ "paramLongName": "datasourceIdBlacklist",
+ "paramDescription": "a black list (comma separeted, - for empty list) of datasource ids",
+ "paramRequired": true
}
]
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml
index 9128c9820..b38290448 100644
--- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml
@@ -73,23 +73,24 @@
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
+
+
yarn
cluster
- GenerateEventsJob
- eu.dnetlib.dhp.broker.oa.GenerateEventsJob
+ IndexOnESJob
+ eu.dnetlib.dhp.broker.oa.IndexOnESJob
dhp-broker-events-${projectVersion}.jar
- --executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
+ --conf spark.dynamicAllocation.maxExecutors="2"
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@@ -97,8 +98,8 @@
--conf spark.sql.shuffle.partitions=3840
--workingPath${workingPath}
- --isLookupUrl${isLookupUrl}
- --dedupConfProfile${dedupConfProfId}
+ --index${esIndexName}
+ --esHost${esIndexHost}