From b413db0bff8d7ae4d5c9bfb4eb38205a198325da Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 2 Jul 2020 12:43:03 +0200 Subject: [PATCH] white/blacklists --- dhp-workflows/dhp-broker-events/pom.xml | 2 +- .../dhp/broker/model/EventFactory.java | 3 + .../dhp/broker/model/MappedFields.java | 22 +++++ .../dhp/broker/oa/GenerateEventsJob.java | 38 ++++++++- .../dnetlib/dhp/broker/oa/IndexOnESJob.java | 1 + .../dnetlib/dhp/broker/oa/JoinStep0Job.java | 80 +++++++++++++++++++ .../dnetlib/dhp/broker/oa/JoinStep1Job.java | 2 +- .../oa/PrepareRelatedDatasourcesJob.java | 68 ++++++++++++++++ .../dhp/broker/oa/util/EventFinder.java | 29 ++++++- .../dhp/broker/oa/util/UpdateInfo.java | 3 +- .../withRels/AddDatasourceTypeAggregator.java | 59 ++++++++++++++ .../withRels/SimpleDatasourceInfo.java | 40 ++++++++++ .../oa/generate_all/oozie_app/workflow.xml | 69 +++++++++++++++- .../dhp/broker/oa/generate_events.json | 18 +++++ .../broker/oa/partial/oozie_app/workflow.xml | 15 ++-- 15 files changed, 433 insertions(+), 16 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/AddDatasourceTypeAggregator.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/SimpleDatasourceInfo.java diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml index e3182c259..119031b06 100644 --- a/dhp-workflows/dhp-broker-events/pom.xml +++ b/dhp-workflows/dhp-broker-events/pom.xml @@ -59,7 +59,7 @@ eu.dnetlib dnet-openaire-broker-common - [3.0.3,4.0.0) + [3.0.4,4.0.0) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java index 315a054d3..49e750698 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java @@ -42,6 +42,7 @@ public class EventFactory { res.setCreationDate(now); res.setExpiryDate(calculateExpiryDate(now)); res.setInstantMessage(false); + return res; } @@ -53,6 +54,7 @@ public class EventFactory { map.setTargetDatasourceId(target.getCollectedFromId()); map.setTargetDatasourceName(target.getCollectedFromName()); + map.setTargetDatasourceType(target.getCollectedFromType()); map.setTargetResultId(target.getOpenaireId()); @@ -73,6 +75,7 @@ public class EventFactory { map.setTrust(updateInfo.getTrust()); map.setProvenanceDatasourceId(source.getCollectedFromId()); map.setProvenanceDatasourceName(source.getCollectedFromName()); + map.setProvenanceDatasourceType(source.getCollectedFromType()); map.setProvenanceResultId(source.getOpenaireId()); return map; diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MappedFields.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MappedFields.java index 4b0ed171b..2c1be3ba4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MappedFields.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MappedFields.java @@ -13,6 +13,7 @@ public class MappedFields implements Serializable { private String targetDatasourceId; private String targetDatasourceName; + private String targetDatasourceType; private String targetResultId; private String targetResultTitle; private long targetDateofacceptance; @@ -21,6 +22,7 @@ public class MappedFields implements Serializable { private float trust; private String provenanceDatasourceId; private String provenanceDatasourceName; + private String provenanceDatasourceType; private String provenanceResultId; public String getTargetDatasourceId() { @@ -39,6 +41,14 @@ public class MappedFields implements Serializable { this.targetDatasourceName = targetDatasourceName; } + public String getTargetDatasourceType() { + return targetDatasourceType; + } + + public void setTargetDatasourceType(final String targetDatasourceType) { + this.targetDatasourceType = targetDatasourceType; + } + public String getTargetResultId() { return targetResultId; } @@ -103,6 +113,14 @@ public class MappedFields implements Serializable { this.provenanceDatasourceName = provenanceDatasourceName; } + public String getProvenanceDatasourceType() { + return provenanceDatasourceType; + } + + public void setProvenanceDatasourceType(final String provenanceDatasourceType) { + this.provenanceDatasourceType = provenanceDatasourceType; + } + public String getProvenanceResultId() { return provenanceResultId; } @@ -111,4 +129,8 @@ public class MappedFields implements Serializable { this.provenanceResultId = provenanceResultId; } + public static long getSerialversionuid() { + return serialVersionUID; + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java index d6ac71429..5d3121aed 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java @@ -3,11 +3,15 @@ package eu.dnetlib.dhp.broker.oa; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.util.Arrays; +import java.util.HashSet; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.Dataset; @@ -59,6 +63,15 @@ public class GenerateEventsJob { final String eventsPath = workingPath + "/events"; log.info("eventsPath: {}", eventsPath); + final Set dsIdWhitelist = parseParamAsList(parser, "datasourceIdWhitelist"); + log.info("datasourceIdWhitelist: {}", StringUtils.join(dsIdWhitelist, ",")); + + final Set dsTypeWhitelist = parseParamAsList(parser, "datasourceTypeWhitelist"); + log.info("datasourceTypeWhitelist: {}", StringUtils.join(dsTypeWhitelist, ",")); + + final Set dsIdBlacklist = parseParamAsList(parser, "datasourceIdBlacklist"); + log.info("datasourceIdBlacklist: {}", StringUtils.join(dsIdBlacklist, ",")); + final SparkConf conf = new SparkConf(); // TODO UNCOMMENT @@ -77,9 +90,12 @@ public class GenerateEventsJob { .readPath(spark, workingPath + "/duplicates", ResultGroup.class); final Dataset dataset = groups - .map(g -> EventFinder.generateEvents(g, dedupConfig, accumulators), Encoders.bean(EventGroup.class)) - .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class)) - .map(e -> ClusterUtils.incrementAccumulator(e, total), Encoders.bean(Event.class)); + .map( + g -> EventFinder + .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, dedupConfig, accumulators), + Encoders + .bean(EventGroup.class)) + .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class)); ClusterUtils.save(dataset, eventsPath, Event.class, total); @@ -87,6 +103,22 @@ public class GenerateEventsJob { } + private static Set parseParamAsList(final ArgumentApplicationParser parser, final String key) { + final String s = parser.get(key).trim(); + + final Set res = new HashSet<>(); + + if (s.length() > 1) { // A value of a single char (for example: '-') indicates an empty list + Arrays + .stream(s.split(",")) + .map(String::trim) + .filter(StringUtils::isNotBlank) + .forEach(res::add); + } + + return res; + } + public static Map prepareAccumulators(final SparkContext sc) { return EventFinder diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java index 36d0ffd1b..9124d18e3 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java @@ -48,6 +48,7 @@ public class IndexOnESJob { final JavaRDD inputRdd = ClusterUtils .readPath(spark, eventsPath, Event.class) + .limit(10000) // TODO REMOVE .map(IndexOnESJob::eventAsJsonString, Encoders.STRING()) .javaRDD(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java new file mode 100644 index 000000000..eb1825fa5 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java @@ -0,0 +1,80 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.TypedColumn; +import org.apache.spark.util.LongAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.AddDatasourceTypeAggregator; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.SimpleDatasourceInfo; +import scala.Tuple2; + +public class JoinStep0Job { + + private static final Logger log = LoggerFactory.getLogger(JoinStep0Job.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + JoinStep0Job.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String graphPath = parser.get("graphPath"); + log.info("graphPath: {}", graphPath); + + final String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + final String outputPath = workingPath + "/joinedEntities_step0"; + log.info("outputPath: {}", outputPath); + + final SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + ClusterUtils.removeDir(spark, outputPath); + + final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); + + final Dataset sources = ClusterUtils + .readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class); + + final Dataset datasources = ClusterUtils + .readPath(spark, workingPath + "/datasources", SimpleDatasourceInfo.class); + + final TypedColumn, OaBrokerMainEntity> aggr = new AddDatasourceTypeAggregator() + .toColumn(); + + final Dataset dataset = sources + .joinWith(datasources, sources.col("collectedFromId").equalTo(datasources.col("id")), "inner") + .groupByKey(t -> t._1.getOpenaireId(), Encoders.STRING()) + .agg(aggr) + .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); + + ClusterUtils.save(dataset, outputPath, OaBrokerMainEntity.class, total); + + }); + + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java index f9bf2d146..8e502f736 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java @@ -55,7 +55,7 @@ public class JoinStep1Job { final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final Dataset sources = ClusterUtils - .readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class); + .readPath(spark, workingPath + "/joinedEntities_step0", OaBrokerMainEntity.class); final Dataset typedRels = ClusterUtils .readPath(spark, workingPath + "/relatedProjects", RelatedProject.class); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java new file mode 100644 index 000000000..30f5ddac3 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java @@ -0,0 +1,68 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.util.LongAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.SimpleDatasourceInfo; +import eu.dnetlib.dhp.schema.oaf.Datasource; + +public class PrepareRelatedDatasourcesJob { + + private static final Logger log = LoggerFactory.getLogger(PrepareRelatedDatasourcesJob.class); + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + PrepareRelatedDatasourcesJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String graphPath = parser.get("graphPath"); + log.info("graphPath: {}", graphPath); + + final String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + + final String relsPath = workingPath + "/datasources"; + log.info("relsPath: {}", relsPath); + + final SparkConf conf = new SparkConf(); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + ClusterUtils.removeDir(spark, relsPath); + + final LongAccumulator total = spark.sparkContext().longAccumulator("total_datasources"); + + final Dataset dataset = ClusterUtils + .readPath(spark, graphPath + "/datasource", Datasource.class) + .map( + ds -> new SimpleDatasourceInfo(ds.getId(), ds.getDatasourcetype().getClassid()), + Encoders.bean(SimpleDatasourceInfo.class)); + + ClusterUtils.save(dataset, relsPath, SimpleDatasourceInfo.class, total); + + }); + + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java index 5ed55247b..e7abae68b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java @@ -4,8 +4,11 @@ package eu.dnetlib.dhp.broker.oa.util; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.spark.util.LongAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.broker.model.EventFactory; @@ -38,6 +41,8 @@ import eu.dnetlib.pace.config.DedupConfig; public class EventFinder { + private static final Logger log = LoggerFactory.getLogger(EventFinder.class); + private static final List> matchers = new ArrayList<>(); static { matchers.add(new EnrichMissingAbstract()); @@ -69,19 +74,39 @@ public class EventFinder { } public static EventGroup generateEvents(final ResultGroup results, + final Set dsIdWhitelist, + final Set dsIdBlacklist, + final Set dsTypeWhitelist, final DedupConfig dedupConfig, final Map accumulators) { + final List> list = new ArrayList<>(); for (final OaBrokerMainEntity target : results.getData()) { - for (final UpdateMatcher matcher : matchers) { - list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig, accumulators)); + if (verifyTarget(target, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) { + for (final UpdateMatcher matcher : matchers) { + list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig, accumulators)); + } } } return asEventGroup(list); } + private static boolean verifyTarget(final OaBrokerMainEntity target, + final Set dsIdWhitelist, + final Set dsIdBlacklist, + final Set dsTypeWhitelist) { + + if (dsIdWhitelist.contains(target.getCollectedFromId())) { + return true; + } else if (dsIdBlacklist.contains(target.getCollectedFromId())) { + return false; + } else { + return dsTypeWhitelist.contains(target.getCollectedFromType()); + } + } + private static EventGroup asEventGroup(final List> list) { final EventGroup events = new EventGroup(); list.stream().map(EventFactory::newBrokerEvent).forEach(events::addElement); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java index 048683b50..0586b681e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java @@ -113,6 +113,7 @@ public final class UpdateInfo { final String provId = getSource().getOpenaireId(); final String provRepo = getSource().getCollectedFromName(); + final String provType = getSource().getCollectedFromType(); final String provUrl = getSource() .getInstances() @@ -122,7 +123,7 @@ public final class UpdateInfo { .orElse(null); ; - final OaBrokerProvenance provenance = new OaBrokerProvenance(provId, provRepo, provUrl); + final OaBrokerProvenance provenance = new OaBrokerProvenance(provId, provRepo, provType, provUrl); final OaBrokerEventPayload res = new OaBrokerEventPayload(); res.setResult(target); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/AddDatasourceTypeAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/AddDatasourceTypeAggregator.java new file mode 100644 index 000000000..ccd15c8c6 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/AddDatasourceTypeAggregator.java @@ -0,0 +1,59 @@ + +package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; + +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.expressions.Aggregator; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import scala.Tuple2; + +public class AddDatasourceTypeAggregator + extends Aggregator, OaBrokerMainEntity, OaBrokerMainEntity> { + + /** + * + */ + private static final long serialVersionUID = 8788588975496014728L; + + @Override + public OaBrokerMainEntity zero() { + return new OaBrokerMainEntity(); + } + + @Override + public OaBrokerMainEntity finish(final OaBrokerMainEntity g) { + return g; + } + + @Override + public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, + final Tuple2 t) { + final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1; + if (t._2 != null && StringUtils.isNotBlank(t._2.getType())) { + res.setCollectedFromType(t._2.getType()); + } + return res; + + } + + @Override + public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { + if (StringUtils.isNotBlank(g1.getOpenaireId()) && StringUtils.isNotBlank(g1.getCollectedFromType())) { + return g1; + } else { + return g2; + } + } + + @Override + public Encoder bufferEncoder() { + return Encoders.bean(OaBrokerMainEntity.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.bean(OaBrokerMainEntity.class); + } +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/SimpleDatasourceInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/SimpleDatasourceInfo.java new file mode 100644 index 000000000..966f63fa0 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/SimpleDatasourceInfo.java @@ -0,0 +1,40 @@ + +package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; + +import java.io.Serializable; + +public class SimpleDatasourceInfo implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 2996609859416024734L; + + private String id; + private String type; + + public SimpleDatasourceInfo() { + } + + public SimpleDatasourceInfo(final String id, final String type) { + this.id = id; + this.type = type; + } + + public String getId() { + return id; + } + + public void setId(final String id) { + this.id = id; + } + + public String getType() { + return type; + } + + public void setType(final String type) { + this.type = type; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 7667bfba7..a0c7b00db 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -16,6 +16,21 @@ dedupConfProfId the id of a valid Dedup Configuration Profile + + + datasourceIdWhitelist + - + a white list (comma separeted, - for empty list) of datasource ids + + + datasourceTypeWhitelist + - + a white list (comma separeted, - for empty list) of datasource types + + + datasourceIdBlacklist + - + a black list (comma separeted, - for empty list) of datasource ids esIndexName @@ -96,6 +111,7 @@ + @@ -125,6 +141,30 @@ + + + + yarn + cluster + PrepareRelatedDatasourcesJob + eu.dnetlib.dhp.broker.oa.PrepareRelatedDatasourcesJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + @@ -223,7 +263,31 @@ - + + + + + yarn + cluster + JoinStep0 + eu.dnetlib.dhp.broker.oa.JoinStep0Job + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + @@ -365,6 +429,9 @@ --workingPath${workingPath} --isLookupUrl${isLookupUrl} --dedupConfProfile${dedupConfProfId} + --datasourceIdWhitelist${datasourceIdWhitelist} + --datasourceTypeWhitelist${datasourceTypeWhitelist} + --datasourceIdBlacklist${datasourceIdBlacklist} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json index 7ae076159..c545884f9 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json @@ -16,5 +16,23 @@ "paramLongName": "dedupConfProfile", "paramDescription": "the id of a valid Dedup Configuration Profile", "paramRequired": true + }, + { + "paramName": "datasourceIdWhitelist", + "paramLongName": "datasourceIdWhitelist", + "paramDescription": "a white list (comma separeted, - for empty list) of datasource ids", + "paramRequired": true + }, + { + "paramName": "datasourceTypeWhitelist", + "paramLongName": "datasourceTypeWhitelist", + "paramDescription": "a white list (comma separeted, - for empty list) of datasource types", + "paramRequired": true + }, + { + "paramName": "datasourceIdBlacklist", + "paramLongName": "datasourceIdBlacklist", + "paramDescription": "a black list (comma separeted, - for empty list) of datasource ids", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index 9128c9820..b38290448 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -73,23 +73,24 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + + yarn cluster - GenerateEventsJob - eu.dnetlib.dhp.broker.oa.GenerateEventsJob + IndexOnESJob + eu.dnetlib.dhp.broker.oa.IndexOnESJob dhp-broker-events-${projectVersion}.jar - --executor-cores=${sparkExecutorCores} --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} + --conf spark.dynamicAllocation.maxExecutors="2" --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -97,8 +98,8 @@ --conf spark.sql.shuffle.partitions=3840 --workingPath${workingPath} - --isLookupUrl${isLookupUrl} - --dedupConfProfile${dedupConfProfId} + --index${esIndexName} + --esHost${esIndexHost}