diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java index 457b82517..1ae241e34 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java @@ -59,6 +59,9 @@ public class GenerateEventsJob { final Set dsIdBlacklist = ClusterUtils.parseParamAsList(parser, "datasourceIdBlacklist"); log.info("datasourceIdBlacklist: {}", StringUtils.join(dsIdBlacklist, ",")); + final Set topicWhitelist = ClusterUtils.parseParamAsList(parser, "topicWhitelist"); + log.info("topicWhitelist: {}", StringUtils.join(topicWhitelist, ",")); + final SparkConf conf = new SparkConf(); runWithSparkSession(conf, isSparkSessionManaged, spark -> { @@ -75,7 +78,7 @@ public class GenerateEventsJob { final Dataset dataset = groups .map( g -> EventFinder - .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, accumulators), + .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, topicWhitelist, accumulators), Encoders .bean(EventGroup.class)) .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class)); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java index 1ab56cc34..103751f95 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java @@ -76,6 +76,7 @@ public class EventFinder { final Set dsIdWhitelist, final Set dsIdBlacklist, final Set dsTypeWhitelist, + final Set topicWhitelist, final Map accumulators) { final List> list = new ArrayList<>(); @@ -84,7 +85,13 @@ public class EventFinder { for (final OaBrokerRelatedDatasource targetDs : target.getDatasources()) { if (verifyTarget(targetDs, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) { for (final UpdateMatcher matcher : matchers) { - list.addAll(matcher.searchUpdatesForRecord(target, targetDs, results.getData(), accumulators)); + for (final UpdateInfo info : matcher + .searchUpdatesForRecord(target, targetDs, results.getData(), accumulators)) { + if (topicWhitelist == null || topicWhitelist.isEmpty() + || topicWhitelist.contains(info.getTopic().getPath())) { + list.add(info); + } + } } } } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 1a1df418b..ea81c90d0 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -24,6 +24,11 @@ - a black list (comma separeted, - for empty list) of datasource ids + + topicWhitelist + * + a white list (comma separeted, * for all) of topics + esEventIndexName the elasticsearch index name for events @@ -447,6 +452,7 @@ --datasourceIdWhitelist${datasourceIdWhitelist} --datasourceTypeWhitelist${datasourceTypeWhitelist} --datasourceIdBlacklist${datasourceIdBlacklist} + --topicWhitelist${topicWhitelist} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json index a482ee2dc..e803bb5b9 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json @@ -28,5 +28,11 @@ "paramLongName": "datasourceIdBlacklist", "paramDescription": "a black list (comma separeted, - for empty list) of datasource ids", "paramRequired": true + }, + { + "paramName": "topicWhitelist", + "paramLongName": "topicWhitelist", + "paramDescription": "a white list (comma separeted, * for all) of topics", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/config-default.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/config-default.xml new file mode 100644 index 000000000..2e0ed9aee --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml new file mode 100644 index 000000000..43c26f3a3 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml @@ -0,0 +1,116 @@ + + + + + outputDir + the path where the the generated data will be stored + + + esEventIndexName + the elasticsearch index name for events + + + esIndexHost + the elasticsearch host + + + maxIndexedEventsForDsAndTopic + the max number of events for each couple (ds/topic) + + + brokerApiBaseUrl + the url of the broker service api + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + IndexEventSubsetOnESJob + eu.dnetlib.dhp.broker.oa.IndexEventSubsetJob + dhp-broker-events-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --outputDir${outputDir} + --index${esEventIndexName} + --esHost${esIndexHost} + --maxEventsForTopic${maxIndexedEventsForDsAndTopic} + --brokerApiBaseUrl${brokerApiBaseUrl} + + + + + + + + + + \ No newline at end of file