From ec94cc9b939d4cfe92643d0c1d14afe52e403559 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 1 Oct 2021 09:41:27 +0200 Subject: [PATCH 1/8] IndexNotificationsJob test: persist contents on HDFS instead of passing them to ES --- .../dhp/broker/oa/IndexNotificationsJob.java | 29 ++----------------- 1 file changed, 3 insertions(+), 26 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index 80549e1ce..7b65c27ab 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -101,33 +101,10 @@ public class IndexNotificationsJob { ngEncoder) .flatMap((FlatMapFunction) g -> g.getData().iterator(), nEncoder); - final JavaRDD inputRdd = notifications + notifications .map((MapFunction) n -> prepareForIndexing(n, total), Encoders.STRING()) - .javaRDD(); - - final Map esCfg = new HashMap<>(); - // esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54"); - - esCfg.put("es.index.auto.create", "false"); - esCfg.put("es.nodes", indexHost); - esCfg.put("es.mapping.id", "notificationId"); // THE PRIMARY KEY - esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount); - esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait); - esCfg.put("es.batch.size.entries", esBatchSizeEntries); - esCfg.put("es.nodes.wan.only", esNodesWanOnly); - - log.info("*** Start indexing"); - JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); - log.info("*** End indexing"); - - log.info("*** Deleting old notifications"); - final String message = deleteOldNotifications(brokerApiBaseUrl, startTime - 1000); - log.info("*** Deleted notifications: " + message); - - log.info("*** sendNotifications (emails, ...)"); - sendNotifications(brokerApiBaseUrl, startTime - 1000); - log.info("*** ALL done."); - + .javaRDD() + .saveAsTextFile("/tmp/IndexNotificationsJob_test_6504"); } } From b01cd521b0eede2ad6ab8ab95e22f6a3dd14cae1 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 1 Oct 2021 11:26:33 +0200 Subject: [PATCH 2/8] removed configuration specifying the limit to 8 for spark.dynamicAllocation.maxExecutors --- .../dhp/broker/oa/notifications_only/oozie_app/workflow.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml index 248326d57..70ab7d344 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml @@ -135,7 +135,6 @@ --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --conf spark.dynamicAllocation.maxExecutors="8" --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} From 31a6ad1d79cea595d85a1ef9a15fc6ea714a97f3 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 4 Oct 2021 12:01:56 +0200 Subject: [PATCH 3/8] optimization of verifySubsriptions() --- .../dhp/broker/oa/IndexNotificationsJob.java | 39 +++++---- .../broker/oa/IndexNotificationsJobTest.java | 84 +++++++++++++++++++ 2 files changed, 105 insertions(+), 18 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index 7b65c27ab..c66e5f4f4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -17,7 +17,6 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -25,7 +24,6 @@ import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.apache.spark.util.LongAccumulator; -import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,13 +44,14 @@ public class IndexNotificationsJob { private static final Logger log = LoggerFactory.getLogger(IndexNotificationsJob.class); + private static Map>> conditionsForSubscriptions = new HashMap<>(); + public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - IndexNotificationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); + .toString(IndexNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); @@ -89,6 +88,8 @@ public class IndexNotificationsJob { final List subscriptions = listSubscriptions(brokerApiBaseUrl); + initConditionsForSubscriptions(subscriptions); + log.info("Number of subscriptions: " + subscriptions.size()); if (subscriptions.size() > 0) { @@ -96,9 +97,7 @@ public class IndexNotificationsJob { final Encoder nEncoder = Encoders.bean(Notification.class); final Dataset notifications = ClusterUtils .readPath(spark, eventsPath, Event.class) - .map( - (MapFunction) e -> generateNotifications(e, subscriptions, startTime), - ngEncoder) + .map((MapFunction) e -> generateNotifications(e, subscriptions, startTime), ngEncoder) .flatMap((FlatMapFunction) g -> g.getData().iterator(), nEncoder); notifications @@ -108,20 +107,27 @@ public class IndexNotificationsJob { } } - private static NotificationGroup generateNotifications(final Event e, + protected static void initConditionsForSubscriptions(final List subscriptions) { + subscriptions.forEach(s -> conditionsForSubscriptions.put(s.getSubscriptionId(), s.conditionsAsMap())); + } + + protected static NotificationGroup generateNotifications(final Event e, final List subscriptions, final long date) { final List list = subscriptions .stream() - .filter( - s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) - .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) + .filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) + .filter(s -> verifyConditions(e.getMap(), conditionsAsMap(s))) .map(s -> generateNotification(s, e, date)) .collect(Collectors.toList()); return new NotificationGroup(list); } + private static Map> conditionsAsMap(final Subscription s) { + return conditionsForSubscriptions.get(s.getSubscriptionId()); + } + private static Notification generateNotification(final Subscription s, final Event e, final long date) { final Notification n = new Notification(); n.setNotificationId("ntf-" + DigestUtils.md5Hex(s.getSubscriptionId() + "@@@" + e.getEventId())); @@ -145,18 +151,15 @@ public class IndexNotificationsJob { if (conditions.containsKey("trust") && !SubscriptionUtils - .verifyFloatRange( - map.getTrust(), conditions.get("trust").get(0).getValue(), - conditions.get("trust").get(0).getOtherValue())) { + .verifyFloatRange(map.getTrust(), conditions.get("trust").get(0).getValue(), conditions.get("trust").get(0).getOtherValue())) { return false; } if (conditions.containsKey("targetDateofacceptance") && !conditions .get("targetDateofacceptance") .stream() - .anyMatch( - c -> SubscriptionUtils - .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { + .anyMatch(c -> SubscriptionUtils + .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { return false; } diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java new file mode 100644 index 000000000..ea5dbd5d1 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java @@ -0,0 +1,84 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Arrays; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.model.MappedFields; +import eu.dnetlib.dhp.broker.model.Subscription; +import eu.dnetlib.dhp.broker.oa.util.NotificationGroup; + +class IndexNotificationsJobTest { + + private List subscriptions; + + @BeforeEach + void setUp() throws Exception { + final Subscription s = new Subscription(); + s.setTopic("ENRICH/MISSING/PID"); + s.setConditions("[{\"field\":\"targetDatasourceName\",\"fieldType\":\"STRING\",\"operator\":\"EXACT\",\"listParams\":[{\"value\":\"reposiTUm\"}]},{\"field\":\"trust\",\"fieldType\":\"FLOAT\",\"operator\":\"RANGE\",\"listParams\":[{\"value\":\"0\",\"otherValue\":\"1\"}]}]"); + subscriptions = Arrays.asList(s); + IndexNotificationsJob.initConditionsForSubscriptions(subscriptions); + } + + @Test + void testGenerateNotifications_invalid_topic() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PROJECT"); + + for (int i = 0; i < 10; i++) { + final long start = System.currentTimeMillis(); + final NotificationGroup res = IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + final long end = System.currentTimeMillis(); + + System.out.println("no topic - execution time (ms): " + (end - start)); + + assertEquals(0, res.getData().size()); + } + } + + @Test + void testGenerateNotifications_topic_match() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PID"); + event.setMap(new MappedFields()); + event.getMap().setTargetDatasourceName("reposiTUm"); + event.getMap().setTrust(0.8f); + + for (int i = 0; i < 10; i++) { + final long start = System.currentTimeMillis(); + final NotificationGroup res = IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + final long end = System.currentTimeMillis(); + + System.out.println("topic match - execution time (ms): " + (end - start)); + + assertEquals(1, res.getData().size()); + } + } + + @Test + void testGenerateNotifications_topic_no_match() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PID"); + event.setMap(new MappedFields()); + event.getMap().setTargetDatasourceName("Puma"); + event.getMap().setTrust(0.8f); + + for (int i = 0; i < 10; i++) { + final long start = System.currentTimeMillis(); + final NotificationGroup res = IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + final long end = System.currentTimeMillis(); + + System.out.println("topic no match - execution time (ms): " + (end - start)); + + assertEquals(0, res.getData().size()); + } + } + +} From 0a9ef34b56cdc1fb9783a0efb84dab29b03ebcbb Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 4 Oct 2021 15:46:12 +0200 Subject: [PATCH 4/8] test --- .../broker/oa/IndexNotificationsJobTest.java | 84 +++++++++++++------ 1 file changed, 60 insertions(+), 24 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java index ea5dbd5d1..6010da2c3 100644 --- a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java @@ -18,6 +18,8 @@ class IndexNotificationsJobTest { private List subscriptions; + private static final int N_TIMES = 1_000_000; + @BeforeEach void setUp() throws Exception { final Subscription s = new Subscription(); @@ -32,15 +34,8 @@ class IndexNotificationsJobTest { final Event event = new Event(); event.setTopic("ENRICH/MISSING/PROJECT"); - for (int i = 0; i < 10; i++) { - final long start = System.currentTimeMillis(); - final NotificationGroup res = IndexNotificationsJob.generateNotifications(event, subscriptions, 0); - final long end = System.currentTimeMillis(); - - System.out.println("no topic - execution time (ms): " + (end - start)); - - assertEquals(0, res.getData().size()); - } + final NotificationGroup res = IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + assertEquals(0, res.getData().size()); } @Test @@ -51,15 +46,8 @@ class IndexNotificationsJobTest { event.getMap().setTargetDatasourceName("reposiTUm"); event.getMap().setTrust(0.8f); - for (int i = 0; i < 10; i++) { - final long start = System.currentTimeMillis(); - final NotificationGroup res = IndexNotificationsJob.generateNotifications(event, subscriptions, 0); - final long end = System.currentTimeMillis(); - - System.out.println("topic match - execution time (ms): " + (end - start)); - - assertEquals(1, res.getData().size()); - } + final NotificationGroup res = IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + assertEquals(1, res.getData().size()); } @Test @@ -70,15 +58,63 @@ class IndexNotificationsJobTest { event.getMap().setTargetDatasourceName("Puma"); event.getMap().setTrust(0.8f); - for (int i = 0; i < 10; i++) { - final long start = System.currentTimeMillis(); - final NotificationGroup res = IndexNotificationsJob.generateNotifications(event, subscriptions, 0); - final long end = System.currentTimeMillis(); + final NotificationGroup res = IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + assertEquals(0, res.getData().size()); + } - System.out.println("topic no match - execution time (ms): " + (end - start)); + @Test + void testGenerateNotifications_invalid_topic_repeated() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PROJECT"); - assertEquals(0, res.getData().size()); + // warm up + IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + + final long start = System.currentTimeMillis(); + for (int i = 0; i < N_TIMES; i++) { + IndexNotificationsJob.generateNotifications(event, subscriptions, 0); } + final long end = System.currentTimeMillis(); + System.out.println(String.format("no topic - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); + + } + + @Test + void testGenerateNotifications_topic_match_repeated() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PID"); + event.setMap(new MappedFields()); + event.getMap().setTargetDatasourceName("reposiTUm"); + event.getMap().setTrust(0.8f); + + // warm up + IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + + final long start = System.currentTimeMillis(); + for (int i = 0; i < N_TIMES; i++) { + IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + } + final long end = System.currentTimeMillis(); + System.out.println(String.format("topic match - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); + } + + @Test + void testGenerateNotifications_topic_no_match_repeated() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PID"); + event.setMap(new MappedFields()); + event.getMap().setTargetDatasourceName("Puma"); + event.getMap().setTrust(0.8f); + + // warm up + IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + + final long start = System.currentTimeMillis(); + for (int i = 0; i < N_TIMES; i++) { + IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + } + final long end = System.currentTimeMillis(); + System.out.println(String.format("topic no match - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); } } From 8bbaa173354e99610053c5adfefd3f1f5c78f43f Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 5 Oct 2021 09:20:37 +0200 Subject: [PATCH 5/8] reimplemented of conditions cache as a non static variable --- .../dhp/broker/oa/IndexNotificationsJob.java | 42 +++++++++++-------- .../broker/oa/IndexNotificationsJobTest.java | 41 +++++++++++------- 2 files changed, 51 insertions(+), 32 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index c66e5f4f4..5dc3315c4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -44,14 +44,13 @@ public class IndexNotificationsJob { private static final Logger log = LoggerFactory.getLogger(IndexNotificationsJob.class); - private static Map>> conditionsForSubscriptions = new HashMap<>(); - public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(IndexNotificationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); + .toString( + IndexNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); @@ -88,16 +87,19 @@ public class IndexNotificationsJob { final List subscriptions = listSubscriptions(brokerApiBaseUrl); - initConditionsForSubscriptions(subscriptions); - log.info("Number of subscriptions: " + subscriptions.size()); if (subscriptions.size() > 0) { + final Map>> conditionsMap = prepareConditionsMap(subscriptions); + final Encoder ngEncoder = Encoders.bean(NotificationGroup.class); final Encoder nEncoder = Encoders.bean(Notification.class); final Dataset notifications = ClusterUtils .readPath(spark, eventsPath, Event.class) - .map((MapFunction) e -> generateNotifications(e, subscriptions, startTime), ngEncoder) + .map( + (MapFunction) e -> generateNotifications( + e, subscriptions, conditionsMap, startTime), + ngEncoder) .flatMap((FlatMapFunction) g -> g.getData().iterator(), nEncoder); notifications @@ -107,27 +109,28 @@ public class IndexNotificationsJob { } } - protected static void initConditionsForSubscriptions(final List subscriptions) { - subscriptions.forEach(s -> conditionsForSubscriptions.put(s.getSubscriptionId(), s.conditionsAsMap())); + protected static Map>> prepareConditionsMap( + final List subscriptions) { + final Map>> map = new HashMap<>(); + subscriptions.forEach(s -> map.put(s.getSubscriptionId(), s.conditionsAsMap())); + return map; } protected static NotificationGroup generateNotifications(final Event e, final List subscriptions, + final Map>> conditionsMap, final long date) { final List list = subscriptions .stream() - .filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) - .filter(s -> verifyConditions(e.getMap(), conditionsAsMap(s))) + .filter( + s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) + .filter(s -> verifyConditions(e.getMap(), conditionsMap.get(s.getSubscriptionId()))) .map(s -> generateNotification(s, e, date)) .collect(Collectors.toList()); return new NotificationGroup(list); } - private static Map> conditionsAsMap(final Subscription s) { - return conditionsForSubscriptions.get(s.getSubscriptionId()); - } - private static Notification generateNotification(final Subscription s, final Event e, final long date) { final Notification n = new Notification(); n.setNotificationId("ntf-" + DigestUtils.md5Hex(s.getSubscriptionId() + "@@@" + e.getEventId())); @@ -151,15 +154,18 @@ public class IndexNotificationsJob { if (conditions.containsKey("trust") && !SubscriptionUtils - .verifyFloatRange(map.getTrust(), conditions.get("trust").get(0).getValue(), conditions.get("trust").get(0).getOtherValue())) { + .verifyFloatRange( + map.getTrust(), conditions.get("trust").get(0).getValue(), + conditions.get("trust").get(0).getOtherValue())) { return false; } if (conditions.containsKey("targetDateofacceptance") && !conditions .get("targetDateofacceptance") .stream() - .anyMatch(c -> SubscriptionUtils - .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { + .anyMatch( + c -> SubscriptionUtils + .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { return false; } diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java index 6010da2c3..8ecb3061b 100644 --- a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java @@ -5,10 +5,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.Arrays; import java.util.List; +import java.util.Map; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import eu.dnetlib.dhp.broker.model.ConditionParams; import eu.dnetlib.dhp.broker.model.Event; import eu.dnetlib.dhp.broker.model.MappedFields; import eu.dnetlib.dhp.broker.model.Subscription; @@ -18,15 +20,19 @@ class IndexNotificationsJobTest { private List subscriptions; + private Map>> conditionsMap; + private static final int N_TIMES = 1_000_000; @BeforeEach void setUp() throws Exception { final Subscription s = new Subscription(); s.setTopic("ENRICH/MISSING/PID"); - s.setConditions("[{\"field\":\"targetDatasourceName\",\"fieldType\":\"STRING\",\"operator\":\"EXACT\",\"listParams\":[{\"value\":\"reposiTUm\"}]},{\"field\":\"trust\",\"fieldType\":\"FLOAT\",\"operator\":\"RANGE\",\"listParams\":[{\"value\":\"0\",\"otherValue\":\"1\"}]}]"); + s + .setConditions( + "[{\"field\":\"targetDatasourceName\",\"fieldType\":\"STRING\",\"operator\":\"EXACT\",\"listParams\":[{\"value\":\"reposiTUm\"}]},{\"field\":\"trust\",\"fieldType\":\"FLOAT\",\"operator\":\"RANGE\",\"listParams\":[{\"value\":\"0\",\"otherValue\":\"1\"}]}]"); subscriptions = Arrays.asList(s); - IndexNotificationsJob.initConditionsForSubscriptions(subscriptions); + conditionsMap = IndexNotificationsJob.prepareConditionsMap(subscriptions); } @Test @@ -34,7 +40,8 @@ class IndexNotificationsJobTest { final Event event = new Event(); event.setTopic("ENRICH/MISSING/PROJECT"); - final NotificationGroup res = IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + final NotificationGroup res = IndexNotificationsJob + .generateNotifications(event, subscriptions, conditionsMap, 0); assertEquals(0, res.getData().size()); } @@ -46,7 +53,8 @@ class IndexNotificationsJobTest { event.getMap().setTargetDatasourceName("reposiTUm"); event.getMap().setTrust(0.8f); - final NotificationGroup res = IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + final NotificationGroup res = IndexNotificationsJob + .generateNotifications(event, subscriptions, conditionsMap, 0); assertEquals(1, res.getData().size()); } @@ -58,7 +66,8 @@ class IndexNotificationsJobTest { event.getMap().setTargetDatasourceName("Puma"); event.getMap().setTrust(0.8f); - final NotificationGroup res = IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + final NotificationGroup res = IndexNotificationsJob + .generateNotifications(event, subscriptions, conditionsMap, 0); assertEquals(0, res.getData().size()); } @@ -68,14 +77,15 @@ class IndexNotificationsJobTest { event.setTopic("ENRICH/MISSING/PROJECT"); // warm up - IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + IndexNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); final long start = System.currentTimeMillis(); for (int i = 0; i < N_TIMES; i++) { - IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + IndexNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); } final long end = System.currentTimeMillis(); - System.out.println(String.format("no topic - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); + System.out + .println(String.format("no topic - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); } @@ -88,14 +98,15 @@ class IndexNotificationsJobTest { event.getMap().setTrust(0.8f); // warm up - IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + IndexNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); final long start = System.currentTimeMillis(); for (int i = 0; i < N_TIMES; i++) { - IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + IndexNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); } final long end = System.currentTimeMillis(); - System.out.println(String.format("topic match - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); + System.out + .println(String.format("topic match - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); } @Test @@ -107,14 +118,16 @@ class IndexNotificationsJobTest { event.getMap().setTrust(0.8f); // warm up - IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + IndexNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); final long start = System.currentTimeMillis(); for (int i = 0; i < N_TIMES; i++) { - IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + IndexNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); } final long end = System.currentTimeMillis(); - System.out.println(String.format("topic no match - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); + System.out + .println( + String.format("topic no match - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); } } From 69008e20c2e1c4e6af8f45c8ad4acd17bdf54fe7 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 5 Oct 2021 11:58:20 +0200 Subject: [PATCH 6/8] log and tests --- .../dhp/broker/oa/IndexNotificationsJob.java | 2 + .../oa/samples/SimpleVariableJobTest.java | 132 ++++++++++++++++++ 2 files changed, 134 insertions(+) create mode 100644 dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/samples/SimpleVariableJobTest.java diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index 5dc3315c4..fb2e26ba2 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -92,6 +92,8 @@ public class IndexNotificationsJob { if (subscriptions.size() > 0) { final Map>> conditionsMap = prepareConditionsMap(subscriptions); + log.info("ConditionsMap: " + new ObjectMapper().writeValueAsString(conditionsMap)); + final Encoder ngEncoder = Encoders.bean(NotificationGroup.class); final Encoder nEncoder = Encoders.bean(Notification.class); final Dataset notifications = ClusterUtils diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/samples/SimpleVariableJobTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/samples/SimpleVariableJobTest.java new file mode 100644 index 000000000..a6d1c89d3 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/samples/SimpleVariableJobTest.java @@ -0,0 +1,132 @@ + +package eu.dnetlib.dhp.broker.oa.samples; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.broker.model.ConditionParams; +import eu.dnetlib.dhp.broker.model.MapCondition; +import eu.dnetlib.dhp.broker.oa.util.SubscriptionUtils; + +@Disabled +public class SimpleVariableJobTest { + + private static final Logger log = LoggerFactory.getLogger(SimpleVariableJobTest.class); + + private static Path workingDir; + + private static SparkSession spark; + + private final static List inputList = new ArrayList<>(); + + private static final Map>> staticMap = new HashMap<>(); + + @BeforeAll + public static void beforeAll() throws IOException { + + workingDir = Files.createTempDirectory(SimpleVariableJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + final SparkConf conf = new SparkConf(); + conf.setAppName(SimpleVariableJobTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + // conf.set("spark.sql.warehouse.dir", workingDir.toString()); + // conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(SimpleVariableJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + + for (int i = 0; i < 1_000_000; i++) { + inputList.add("record " + i); + } + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void testSimpleVariableJob() throws Exception { + final Map>> map = fillMap(); + + final long n = spark + .createDataset(inputList, Encoders.STRING()) + .filter(s -> filter(map.get(s))) + .map((MapFunction) s -> s.toLowerCase(), Encoders.STRING()) + .count(); + + System.out.println(n); + } + + @Test + public void testSimpleVariableJob_static() throws Exception { + + staticMap.putAll(fillMap()); + + final long n = spark + .createDataset(inputList, Encoders.STRING()) + .filter(s -> filter(staticMap.get(s))) + .map((MapFunction) s -> s.toLowerCase(), Encoders.STRING()) + .count(); + + System.out.println(n); + } + + private static Map>> fillMap() + throws JsonParseException, JsonMappingException, IOException { + final String s = "[{\"field\":\"targetDatasourceName\",\"fieldType\":\"STRING\",\"operator\":\"EXACT\",\"listParams\":[{\"value\":\"reposiTUm\"}]},{\"field\":\"trust\",\"fieldType\":\"FLOAT\",\"operator\":\"RANGE\",\"listParams\":[{\"value\":\"0\",\"otherValue\":\"1\"}]}]"; + + final ObjectMapper mapper = new ObjectMapper(); + final List list = mapper + .readValue(s, mapper.getTypeFactory().constructCollectionType(List.class, MapCondition.class)); + final Map> conditions = list + .stream() + .filter(mc -> !mc.getListParams().isEmpty()) + .collect(Collectors.toMap(MapCondition::getField, MapCondition::getListParams)); + + final Map>> map = new HashMap<>(); + inputList.forEach(i -> map.put(i, conditions)); + return map; + } + + private static boolean filter(final Map> conditions) { + if (conditions.containsKey("targetDatasourceName") + && !SubscriptionUtils + .verifyExact("reposiTUm", conditions.get("targetDatasourceName").get(0).getValue())) { + return false; + } + return true; + } + +} From 210d6c0e6d002d76ada343ad4f526cc60e70b41d Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 5 Oct 2021 13:57:46 +0200 Subject: [PATCH 7/8] generateNotificationsJob and indexNotificationsJob --- .../broker/oa/GenerateNotificationsJob.java | 184 ++++++++++++++++++ .../dhp/broker/oa/IndexNotificationsJob.java | 168 +++------------- .../oa/generate_all/oozie_app/workflow.xml | 24 +++ .../dhp/broker/oa/generate_notifications.json | 14 ++ .../notifications_only/oozie_app/workflow.xml | 27 ++- 5 files changed, 279 insertions(+), 138 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateNotificationsJob.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_notifications.json diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateNotificationsJob.java new file mode 100644 index 000000000..21c6c64a6 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateNotificationsJob.java @@ -0,0 +1,184 @@ + +package eu.dnetlib.dhp.broker.oa; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.LongAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.model.ConditionParams; +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.model.MappedFields; +import eu.dnetlib.dhp.broker.model.Notification; +import eu.dnetlib.dhp.broker.model.Subscription; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.NotificationGroup; +import eu.dnetlib.dhp.broker.oa.util.SubscriptionUtils; + +public class GenerateNotificationsJob { + + private static final Logger log = LoggerFactory.getLogger(GenerateNotificationsJob.class); + + public static void main(final String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString(GenerateNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_notifications.json"))); + parser.parseArgument(args); + + final SparkConf conf = new SparkConf(); + + final String eventsPath = parser.get("outputDir") + "/events"; + log.info("eventsPath: {}", eventsPath); + + final String notificationsPath = parser.get("outputDir") + "/notifications"; + log.info("notificationsPath: {}", notificationsPath); + + final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl"); + log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl); + + final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); + + final LongAccumulator total = spark.sparkContext().longAccumulator("total_notifications"); + + final long startTime = new Date().getTime(); + + final List subscriptions = listSubscriptions(brokerApiBaseUrl); + + log.info("Number of subscriptions: " + subscriptions.size()); + + if (subscriptions.size() > 0) { + final Map>> conditionsMap = prepareConditionsMap(subscriptions); + + log.info("ConditionsMap: " + new ObjectMapper().writeValueAsString(conditionsMap)); + + final Encoder ngEncoder = Encoders.bean(NotificationGroup.class); + final Encoder nEncoder = Encoders.bean(Notification.class); + final Dataset notifications = ClusterUtils + .readPath(spark, eventsPath, Event.class) + .map((MapFunction) e -> generateNotifications(e, subscriptions, conditionsMap, startTime), ngEncoder) + .flatMap((FlatMapFunction) g -> g.getData().iterator(), nEncoder); + + ClusterUtils.save(notifications, notificationsPath, Notification.class, total); + } + } + + protected static Map>> prepareConditionsMap( + final List subscriptions) { + final Map>> map = new HashMap<>(); + subscriptions.forEach(s -> map.put(s.getSubscriptionId(), s.conditionsAsMap())); + return map; + } + + protected static NotificationGroup generateNotifications(final Event e, + final List subscriptions, + final Map>> conditionsMap, + final long date) { + final List list = subscriptions + .stream() + .filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) + .filter(s -> verifyConditions(e.getMap(), conditionsMap.get(s.getSubscriptionId()))) + .map(s -> generateNotification(s, e, date)) + .collect(Collectors.toList()); + + return new NotificationGroup(list); + } + + private static Notification generateNotification(final Subscription s, final Event e, final long date) { + final Notification n = new Notification(); + n.setNotificationId("ntf-" + DigestUtils.md5Hex(s.getSubscriptionId() + "@@@" + e.getEventId())); + n.setSubscriptionId(s.getSubscriptionId()); + n.setEventId(e.getEventId()); + n.setProducerId(e.getProducerId()); + n.setTopic(e.getTopic()); + n.setPayload(e.getPayload()); + n.setMap(e.getMap()); + n.setDate(date); + return n; + } + + private static boolean verifyConditions(final MappedFields map, + final Map> conditions) { + if (conditions.containsKey("targetDatasourceName") + && !SubscriptionUtils + .verifyExact(map.getTargetDatasourceName(), conditions.get("targetDatasourceName").get(0).getValue())) { + return false; + } + + if (conditions.containsKey("trust") + && !SubscriptionUtils + .verifyFloatRange(map.getTrust(), conditions.get("trust").get(0).getValue(), conditions.get("trust").get(0).getOtherValue())) { + return false; + } + + if (conditions.containsKey("targetDateofacceptance") && !conditions + .get("targetDateofacceptance") + .stream() + .anyMatch(c -> SubscriptionUtils + .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { + return false; + } + + if (conditions.containsKey("targetResultTitle") + && !conditions + .get("targetResultTitle") + .stream() + .anyMatch(c -> SubscriptionUtils.verifySimilar(map.getTargetResultTitle(), c.getValue()))) { + return false; + } + + if (conditions.containsKey("targetAuthors") + && !conditions + .get("targetAuthors") + .stream() + .allMatch(c -> SubscriptionUtils.verifyListSimilar(map.getTargetAuthors(), c.getValue()))) { + return false; + } + + return !conditions.containsKey("targetSubjects") + || conditions + .get("targetSubjects") + .stream() + .allMatch(c -> SubscriptionUtils.verifyListExact(map.getTargetSubjects(), c.getValue())); + + } + + private static List listSubscriptions(final String brokerApiBaseUrl) throws Exception { + final String url = brokerApiBaseUrl + "/api/subscriptions"; + final HttpGet req = new HttpGet(url); + + final ObjectMapper mapper = new ObjectMapper(); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + final String s = IOUtils.toString(response.getEntity().getContent()); + return mapper + .readValue(s, mapper.getTypeFactory().constructCollectionType(List.class, Subscription.class)); + } + } + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index fb2e26ba2..55e82446f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -2,28 +2,22 @@ package eu.dnetlib.dhp.broker.oa; import java.io.IOException; -import java.util.Date; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; -import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.apache.spark.util.LongAccumulator; +import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,14 +25,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.broker.model.ConditionParams; -import eu.dnetlib.dhp.broker.model.Event; -import eu.dnetlib.dhp.broker.model.MappedFields; import eu.dnetlib.dhp.broker.model.Notification; -import eu.dnetlib.dhp.broker.model.Subscription; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; -import eu.dnetlib.dhp.broker.oa.util.NotificationGroup; -import eu.dnetlib.dhp.broker.oa.util.SubscriptionUtils; public class IndexNotificationsJob { @@ -48,15 +36,14 @@ public class IndexNotificationsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - IndexNotificationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); + .toString(IndexNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("outputDir") + "/events"; - log.info("eventsPath: {}", eventsPath); + final String notificationsPath = parser.get("outputDir") + "/notifications"; + log.info("notificationsPath: {}", notificationsPath); final String index = parser.get("index"); log.info("index: {}", index); @@ -83,131 +70,38 @@ public class IndexNotificationsJob { final LongAccumulator total = spark.sparkContext().longAccumulator("total_indexed"); - final long startTime = new Date().getTime(); + final Long date = ClusterUtils + .readPath(spark, notificationsPath, Notification.class) + .first() + .getDate(); - final List subscriptions = listSubscriptions(brokerApiBaseUrl); + final JavaRDD toIndexRdd = ClusterUtils + .readPath(spark, notificationsPath, Notification.class) + .map((MapFunction) n -> prepareForIndexing(n, total), Encoders.STRING()) + .javaRDD(); - log.info("Number of subscriptions: " + subscriptions.size()); + final Map esCfg = new HashMap<>(); - if (subscriptions.size() > 0) { - final Map>> conditionsMap = prepareConditionsMap(subscriptions); + esCfg.put("es.index.auto.create", "false"); + esCfg.put("es.nodes", indexHost); + esCfg.put("es.mapping.id", "notificationId"); // THE PRIMARY KEY + esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount); + esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait); + esCfg.put("es.batch.size.entries", esBatchSizeEntries); + esCfg.put("es.nodes.wan.only", esNodesWanOnly); - log.info("ConditionsMap: " + new ObjectMapper().writeValueAsString(conditionsMap)); + log.info("*** Start indexing"); + JavaEsSpark.saveJsonToEs(toIndexRdd, index, esCfg); + log.info("*** End indexing"); - final Encoder ngEncoder = Encoders.bean(NotificationGroup.class); - final Encoder nEncoder = Encoders.bean(Notification.class); - final Dataset notifications = ClusterUtils - .readPath(spark, eventsPath, Event.class) - .map( - (MapFunction) e -> generateNotifications( - e, subscriptions, conditionsMap, startTime), - ngEncoder) - .flatMap((FlatMapFunction) g -> g.getData().iterator(), nEncoder); + log.info("*** Deleting old notifications"); + final String message = deleteOldNotifications(brokerApiBaseUrl, date - 1000); + log.info("*** Deleted notifications: {}", message); - notifications - .map((MapFunction) n -> prepareForIndexing(n, total), Encoders.STRING()) - .javaRDD() - .saveAsTextFile("/tmp/IndexNotificationsJob_test_6504"); - } - } + log.info("*** sendNotifications (emails, ...)"); + sendNotifications(brokerApiBaseUrl, date - 1000); + log.info("*** ALL done."); - protected static Map>> prepareConditionsMap( - final List subscriptions) { - final Map>> map = new HashMap<>(); - subscriptions.forEach(s -> map.put(s.getSubscriptionId(), s.conditionsAsMap())); - return map; - } - - protected static NotificationGroup generateNotifications(final Event e, - final List subscriptions, - final Map>> conditionsMap, - final long date) { - final List list = subscriptions - .stream() - .filter( - s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) - .filter(s -> verifyConditions(e.getMap(), conditionsMap.get(s.getSubscriptionId()))) - .map(s -> generateNotification(s, e, date)) - .collect(Collectors.toList()); - - return new NotificationGroup(list); - } - - private static Notification generateNotification(final Subscription s, final Event e, final long date) { - final Notification n = new Notification(); - n.setNotificationId("ntf-" + DigestUtils.md5Hex(s.getSubscriptionId() + "@@@" + e.getEventId())); - n.setSubscriptionId(s.getSubscriptionId()); - n.setEventId(e.getEventId()); - n.setProducerId(e.getProducerId()); - n.setTopic(e.getTopic()); - n.setPayload(e.getPayload()); - n.setMap(e.getMap()); - n.setDate(date); - return n; - } - - private static boolean verifyConditions(final MappedFields map, - final Map> conditions) { - if (conditions.containsKey("targetDatasourceName") - && !SubscriptionUtils - .verifyExact(map.getTargetDatasourceName(), conditions.get("targetDatasourceName").get(0).getValue())) { - return false; - } - - if (conditions.containsKey("trust") - && !SubscriptionUtils - .verifyFloatRange( - map.getTrust(), conditions.get("trust").get(0).getValue(), - conditions.get("trust").get(0).getOtherValue())) { - return false; - } - - if (conditions.containsKey("targetDateofacceptance") && !conditions - .get("targetDateofacceptance") - .stream() - .anyMatch( - c -> SubscriptionUtils - .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { - return false; - } - - if (conditions.containsKey("targetResultTitle") - && !conditions - .get("targetResultTitle") - .stream() - .anyMatch(c -> SubscriptionUtils.verifySimilar(map.getTargetResultTitle(), c.getValue()))) { - return false; - } - - if (conditions.containsKey("targetAuthors") - && !conditions - .get("targetAuthors") - .stream() - .allMatch(c -> SubscriptionUtils.verifyListSimilar(map.getTargetAuthors(), c.getValue()))) { - return false; - } - - return !conditions.containsKey("targetSubjects") - || conditions - .get("targetSubjects") - .stream() - .allMatch(c -> SubscriptionUtils.verifyListExact(map.getTargetSubjects(), c.getValue())); - - } - - private static List listSubscriptions(final String brokerApiBaseUrl) throws Exception { - final String url = brokerApiBaseUrl + "/api/subscriptions"; - final HttpGet req = new HttpGet(url); - - final ObjectMapper mapper = new ObjectMapper(); - - try (final CloseableHttpClient client = HttpClients.createDefault()) { - try (final CloseableHttpResponse response = client.execute(req)) { - final String s = IOUtils.toString(response.getEntity().getContent()); - return mapper - .readValue(s, mapper.getTypeFactory().constructCollectionType(List.class, Subscription.class)); - } - } } private static String deleteOldNotifications(final String brokerApiBaseUrl, final long l) throws Exception { diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index ea80c3acf..1a98a2513 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -542,6 +542,30 @@ --dbPassword${brokerDbPassword} --brokerApiBaseUrl${brokerApiBaseUrl} + + + + + + + yarn + cluster + GenerateNotificationsJob + eu.dnetlib.dhp.broker.oa.GenerateNotificationsJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --outputDir${outputDir} + --brokerApiBaseUrl${brokerApiBaseUrl} + diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_notifications.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_notifications.json new file mode 100644 index 000000000..6e12783b9 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_notifications.json @@ -0,0 +1,14 @@ +[ + { + "paramName": "o", + "paramLongName": "outputDir", + "paramDescription": "the dir that contains the events folder", + "paramRequired": true + }, + { + "paramName": "broker", + "paramLongName": "brokerApiBaseUrl", + "paramDescription": "the url of the broker service api", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml index 70ab7d344..d2d7b6d11 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml @@ -119,12 +119,36 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + yarn + cluster + GenerateNotificationsJob + eu.dnetlib.dhp.broker.oa.GenerateNotificationsJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --outputDir${outputDir} + --brokerApiBaseUrl${brokerApiBaseUrl} + + + + + yarn @@ -135,6 +159,7 @@ --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} + --conf spark.dynamicAllocation.maxExecutors="8" --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} From d6e1f224086380de758d6be2854a13b6fe8809b4 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 5 Oct 2021 15:09:18 +0200 Subject: [PATCH 8/8] max numbers of workers for indexing --- .../dhp/broker/oa/generate_all/oozie_app/workflow.xml | 9 +++++++-- .../broker/oa/notifications_only/oozie_app/workflow.xml | 7 ++++++- .../dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml | 7 ++++++- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 1a98a2513..bc6778f52 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -115,6 +115,11 @@ spark2EventLogDir spark 2.* event log dir location + + sparkMaxExecutorsForIndexing + 8 + Max number of workers for ElasticSearch indexing + @@ -498,7 +503,7 @@ --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.dynamicAllocation.maxExecutors=${sparkMaxExecutorsForIndexing} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -580,7 +585,7 @@ --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.dynamicAllocation.maxExecutors=${sparkMaxExecutorsForIndexing} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml index d2d7b6d11..0d226d78e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml @@ -98,6 +98,11 @@ spark2EventLogDir spark 2.* event log dir location + + sparkMaxExecutorsForIndexing + 8 + Max number of workers for ElasticSearch indexing + @@ -159,7 +164,7 @@ --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.dynamicAllocation.maxExecutors=${sparkMaxExecutorsForIndexing} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml index 9095004ad..87adfffaa 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml @@ -75,6 +75,11 @@ spark2EventLogDir spark 2.* event log dir location + + sparkMaxExecutorsForIndexing + 8 + Max number of workers for ElasticSearch indexing + @@ -112,7 +117,7 @@ --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.dynamicAllocation.maxExecutors=${sparkMaxExecutorsForIndexing} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}