From 51321c2701b22a617623f0e89e7a0a1a6b4bf09d Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 17 Sep 2020 11:38:07 +0200 Subject: [PATCH] partition of events by opedoarId --- .../dhp/broker/oa/IndexNotificationsJob.java | 17 ++- .../broker/oa/PartitionEventsByDsIdJob.java | 113 +++++++++++++++ .../oozie_app/config-default.xml | 18 +++ .../notifications_only/oozie_app/workflow.xml | 137 ++++++++++++++++++ .../broker/oa/partial/oozie_app/workflow.xml | 15 +- pom.xml | 2 +- 6 files changed, 286 insertions(+), 16 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index cb7acb46df..792a2354a4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -47,8 +47,9 @@ public class IndexNotificationsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(IndexNotificationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); + .toString( + IndexNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); @@ -116,7 +117,8 @@ public class IndexNotificationsJob { final long date) { final List list = subscriptions .stream() - .filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) + .filter( + s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) .map(s -> generateNotification(s, e, date)) .collect(Collectors.toList()); @@ -147,15 +149,18 @@ public class IndexNotificationsJob { if (conditions.containsKey("trust") && !SubscriptionUtils - .verifyFloatRange(map.getTrust(), conditions.get("trust").get(0).getValue(), conditions.get("trust").get(0).getOtherValue())) { + .verifyFloatRange( + map.getTrust(), conditions.get("trust").get(0).getValue(), + conditions.get("trust").get(0).getOtherValue())) { return false; } if (conditions.containsKey("targetDateofacceptance") && !conditions .get("targetDateofacceptance") .stream() - .anyMatch(c -> SubscriptionUtils - .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { + .anyMatch( + c -> SubscriptionUtils + .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { return false; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java new file mode 100644 index 0000000000..0748624f77 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java @@ -0,0 +1,113 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +import eu.dnetlib.broker.api.ShortEventMessage; +import eu.dnetlib.broker.objects.OaBrokerEventPayload; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import scala.Tuple2; + +public class PartitionEventsByDsIdJob { + + private static final Logger log = LoggerFactory.getLogger(PartitionEventsByDsIdJob.class); + private static final String OPENDOAR_NSPREFIX = "opendoar____::"; + + public static void main(final String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + PartitionEventsByDsIdJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final SparkConf conf = new SparkConf(); + + final String eventsPath = parser.get("workingPath") + "/events"; + log.info("eventsPath: {}", eventsPath); + + final String partitionPath = parser.get("workingPath") + "/eventsByOpendoarId"; + log.info("partitionPath: {}", partitionPath); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + ClusterUtils + .readPath(spark, eventsPath, Event.class) + .filter(e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId())) + .filter(e -> e.getMap().getTargetDatasourceId().contains(OPENDOAR_NSPREFIX)) + .map( + e -> new Tuple2<>( + StringUtils.substringAfter(e.getMap().getTargetDatasourceId(), OPENDOAR_NSPREFIX), + messageFromNotification(e)), + Encoders.tuple(Encoders.STRING(), Encoders.bean(ShortEventMessage.class))) + .write() + .partitionBy("_1") + .mode(SaveMode.Overwrite) + .json(partitionPath); + + }); + renameSubDirs(partitionPath); + + } + + private static void renameSubDirs(final String path) throws IOException { + final String prefix = "_1="; + final FileSystem fs = FileSystem.get(new Configuration()); + + log.info("** Renaming subdirs of " + path); + for (final FileStatus fileStatus : fs.listStatus(new Path(path))) { + if (fileStatus.isDirectory()) { + final Path oldPath = fileStatus.getPath(); + final String oldName = oldPath.getName(); + if (oldName.startsWith(prefix)) { + final Path newPath = new Path(path + "/" + StringUtils.substringAfter(oldName, prefix)); + log.info(" * " + oldPath.getName() + " -> " + newPath.getName()); + fs.rename(oldPath, newPath); + } + } + } + } + + private static ShortEventMessage messageFromNotification(final Event e) { + final Gson gson = new Gson(); + + final OaBrokerEventPayload payload = gson.fromJson(e.getPayload(), OaBrokerEventPayload.class); + + final ShortEventMessage res = new ShortEventMessage(); + + res.setOriginalId(payload.getResult().getOriginalId()); + res.setTitle(payload.getResult().getTitles().stream().filter(StringUtils::isNotBlank).findFirst().orElse(null)); + res.setTopic(e.getTopic()); + res.setTrust(payload.getTrust()); + res.generateMessageFromObject(payload.getHighlight()); + + return res; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/config-default.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/config-default.xml new file mode 100644 index 0000000000..2e0ed9aeea --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml new file mode 100644 index 0000000000..f629c2101e --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml @@ -0,0 +1,137 @@ + + + + + graphInputPath + the path where the graph is stored + + + workingPath + the path where the the generated data will be stored + + + datasourceIdWhitelist + - + a white list (comma separeted, - for empty list) of datasource ids + + + datasourceTypeWhitelist + - + a white list (comma separeted, - for empty list) of datasource types + + + datasourceIdBlacklist + - + a black list (comma separeted, - for empty list) of datasource ids + + + esEventIndexName + the elasticsearch index name for events + + + esNotificationsIndexName + the elasticsearch index name for notifications + + + esIndexHost + the elasticsearch host + + + maxIndexedEventsForDsAndTopic + the max number of events for each couple (ds/topic) + + + brokerApiBaseUrl + the url of the broker service api + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + IndexNotificationsOnESJob + eu.dnetlib.dhp.broker.oa.IndexNotificationsJob + dhp-broker-events-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --workingPath${workingPath} + --index${esNotificationsIndexName} + --esHost${esIndexHost} + --brokerApiBaseUrl${brokerApiBaseUrl} + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index f629c2101e..a9741a3074 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -99,38 +99,35 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - IndexNotificationsOnESJob - eu.dnetlib.dhp.broker.oa.IndexNotificationsJob + PartitionEventsByDsIdJob + eu.dnetlib.dhp.broker.oa.PartitionEventsByDsIdJob dhp-broker-events-${projectVersion}.jar + --executor-cores=${sparkExecutorCores} --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --conf spark.dynamicAllocation.maxExecutors="8" --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 + --graphPath${graphInputPath} --workingPath${workingPath} - --index${esNotificationsIndexName} - --esHost${esIndexHost} - --brokerApiBaseUrl${brokerApiBaseUrl} - diff --git a/pom.xml b/pom.xml index e88e1d51b7..52edd497f4 100644 --- a/pom.xml +++ b/pom.xml @@ -663,7 +663,7 @@ 3.3.3 3.4.2 [2.12,3.0) - 3.1.0 + 3.1.1 7.5.0 4.7.2 1.1