diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateNotificationsJob.java new file mode 100644 index 000000000..21c6c64a6 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateNotificationsJob.java @@ -0,0 +1,184 @@ + +package eu.dnetlib.dhp.broker.oa; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.LongAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.model.ConditionParams; +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.model.MappedFields; +import eu.dnetlib.dhp.broker.model.Notification; +import eu.dnetlib.dhp.broker.model.Subscription; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.NotificationGroup; +import eu.dnetlib.dhp.broker.oa.util.SubscriptionUtils; + +public class GenerateNotificationsJob { + + private static final Logger log = LoggerFactory.getLogger(GenerateNotificationsJob.class); + + public static void main(final String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString(GenerateNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_notifications.json"))); + parser.parseArgument(args); + + final SparkConf conf = new SparkConf(); + + final String eventsPath = parser.get("outputDir") + "/events"; + log.info("eventsPath: {}", eventsPath); + + final String notificationsPath = parser.get("outputDir") + "/notifications"; + log.info("notificationsPath: {}", notificationsPath); + + final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl"); + log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl); + + final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); + + final LongAccumulator total = spark.sparkContext().longAccumulator("total_notifications"); + + final long startTime = new Date().getTime(); + + final List subscriptions = listSubscriptions(brokerApiBaseUrl); + + log.info("Number of subscriptions: " + subscriptions.size()); + + if (subscriptions.size() > 0) { + final Map>> conditionsMap = prepareConditionsMap(subscriptions); + + log.info("ConditionsMap: " + new ObjectMapper().writeValueAsString(conditionsMap)); + + final Encoder ngEncoder = Encoders.bean(NotificationGroup.class); + final Encoder nEncoder = Encoders.bean(Notification.class); + final Dataset notifications = ClusterUtils + .readPath(spark, eventsPath, Event.class) + .map((MapFunction) e -> generateNotifications(e, subscriptions, conditionsMap, startTime), ngEncoder) + .flatMap((FlatMapFunction) g -> g.getData().iterator(), nEncoder); + + ClusterUtils.save(notifications, notificationsPath, Notification.class, total); + } + } + + protected static Map>> prepareConditionsMap( + final List subscriptions) { + final Map>> map = new HashMap<>(); + subscriptions.forEach(s -> map.put(s.getSubscriptionId(), s.conditionsAsMap())); + return map; + } + + protected static NotificationGroup generateNotifications(final Event e, + final List subscriptions, + final Map>> conditionsMap, + final long date) { + final List list = subscriptions + .stream() + .filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) + .filter(s -> verifyConditions(e.getMap(), conditionsMap.get(s.getSubscriptionId()))) + .map(s -> generateNotification(s, e, date)) + .collect(Collectors.toList()); + + return new NotificationGroup(list); + } + + private static Notification generateNotification(final Subscription s, final Event e, final long date) { + final Notification n = new Notification(); + n.setNotificationId("ntf-" + DigestUtils.md5Hex(s.getSubscriptionId() + "@@@" + e.getEventId())); + n.setSubscriptionId(s.getSubscriptionId()); + n.setEventId(e.getEventId()); + n.setProducerId(e.getProducerId()); + n.setTopic(e.getTopic()); + n.setPayload(e.getPayload()); + n.setMap(e.getMap()); + n.setDate(date); + return n; + } + + private static boolean verifyConditions(final MappedFields map, + final Map> conditions) { + if (conditions.containsKey("targetDatasourceName") + && !SubscriptionUtils + .verifyExact(map.getTargetDatasourceName(), conditions.get("targetDatasourceName").get(0).getValue())) { + return false; + } + + if (conditions.containsKey("trust") + && !SubscriptionUtils + .verifyFloatRange(map.getTrust(), conditions.get("trust").get(0).getValue(), conditions.get("trust").get(0).getOtherValue())) { + return false; + } + + if (conditions.containsKey("targetDateofacceptance") && !conditions + .get("targetDateofacceptance") + .stream() + .anyMatch(c -> SubscriptionUtils + .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { + return false; + } + + if (conditions.containsKey("targetResultTitle") + && !conditions + .get("targetResultTitle") + .stream() + .anyMatch(c -> SubscriptionUtils.verifySimilar(map.getTargetResultTitle(), c.getValue()))) { + return false; + } + + if (conditions.containsKey("targetAuthors") + && !conditions + .get("targetAuthors") + .stream() + .allMatch(c -> SubscriptionUtils.verifyListSimilar(map.getTargetAuthors(), c.getValue()))) { + return false; + } + + return !conditions.containsKey("targetSubjects") + || conditions + .get("targetSubjects") + .stream() + .allMatch(c -> SubscriptionUtils.verifyListExact(map.getTargetSubjects(), c.getValue())); + + } + + private static List listSubscriptions(final String brokerApiBaseUrl) throws Exception { + final String url = brokerApiBaseUrl + "/api/subscriptions"; + final HttpGet req = new HttpGet(url); + + final ObjectMapper mapper = new ObjectMapper(); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + final String s = IOUtils.toString(response.getEntity().getContent()); + return mapper + .readValue(s, mapper.getTypeFactory().constructCollectionType(List.class, Subscription.class)); + } + } + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index fb2e26ba2..55e82446f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -2,28 +2,22 @@ package eu.dnetlib.dhp.broker.oa; import java.io.IOException; -import java.util.Date; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; -import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.apache.spark.util.LongAccumulator; +import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,14 +25,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.broker.model.ConditionParams; -import eu.dnetlib.dhp.broker.model.Event; -import eu.dnetlib.dhp.broker.model.MappedFields; import eu.dnetlib.dhp.broker.model.Notification; -import eu.dnetlib.dhp.broker.model.Subscription; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; -import eu.dnetlib.dhp.broker.oa.util.NotificationGroup; -import eu.dnetlib.dhp.broker.oa.util.SubscriptionUtils; public class IndexNotificationsJob { @@ -48,15 +36,14 @@ public class IndexNotificationsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - IndexNotificationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); + .toString(IndexNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("outputDir") + "/events"; - log.info("eventsPath: {}", eventsPath); + final String notificationsPath = parser.get("outputDir") + "/notifications"; + log.info("notificationsPath: {}", notificationsPath); final String index = parser.get("index"); log.info("index: {}", index); @@ -83,131 +70,38 @@ public class IndexNotificationsJob { final LongAccumulator total = spark.sparkContext().longAccumulator("total_indexed"); - final long startTime = new Date().getTime(); + final Long date = ClusterUtils + .readPath(spark, notificationsPath, Notification.class) + .first() + .getDate(); - final List subscriptions = listSubscriptions(brokerApiBaseUrl); + final JavaRDD toIndexRdd = ClusterUtils + .readPath(spark, notificationsPath, Notification.class) + .map((MapFunction) n -> prepareForIndexing(n, total), Encoders.STRING()) + .javaRDD(); - log.info("Number of subscriptions: " + subscriptions.size()); + final Map esCfg = new HashMap<>(); - if (subscriptions.size() > 0) { - final Map>> conditionsMap = prepareConditionsMap(subscriptions); + esCfg.put("es.index.auto.create", "false"); + esCfg.put("es.nodes", indexHost); + esCfg.put("es.mapping.id", "notificationId"); // THE PRIMARY KEY + esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount); + esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait); + esCfg.put("es.batch.size.entries", esBatchSizeEntries); + esCfg.put("es.nodes.wan.only", esNodesWanOnly); - log.info("ConditionsMap: " + new ObjectMapper().writeValueAsString(conditionsMap)); + log.info("*** Start indexing"); + JavaEsSpark.saveJsonToEs(toIndexRdd, index, esCfg); + log.info("*** End indexing"); - final Encoder ngEncoder = Encoders.bean(NotificationGroup.class); - final Encoder nEncoder = Encoders.bean(Notification.class); - final Dataset notifications = ClusterUtils - .readPath(spark, eventsPath, Event.class) - .map( - (MapFunction) e -> generateNotifications( - e, subscriptions, conditionsMap, startTime), - ngEncoder) - .flatMap((FlatMapFunction) g -> g.getData().iterator(), nEncoder); + log.info("*** Deleting old notifications"); + final String message = deleteOldNotifications(brokerApiBaseUrl, date - 1000); + log.info("*** Deleted notifications: {}", message); - notifications - .map((MapFunction) n -> prepareForIndexing(n, total), Encoders.STRING()) - .javaRDD() - .saveAsTextFile("/tmp/IndexNotificationsJob_test_6504"); - } - } + log.info("*** sendNotifications (emails, ...)"); + sendNotifications(brokerApiBaseUrl, date - 1000); + log.info("*** ALL done."); - protected static Map>> prepareConditionsMap( - final List subscriptions) { - final Map>> map = new HashMap<>(); - subscriptions.forEach(s -> map.put(s.getSubscriptionId(), s.conditionsAsMap())); - return map; - } - - protected static NotificationGroup generateNotifications(final Event e, - final List subscriptions, - final Map>> conditionsMap, - final long date) { - final List list = subscriptions - .stream() - .filter( - s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) - .filter(s -> verifyConditions(e.getMap(), conditionsMap.get(s.getSubscriptionId()))) - .map(s -> generateNotification(s, e, date)) - .collect(Collectors.toList()); - - return new NotificationGroup(list); - } - - private static Notification generateNotification(final Subscription s, final Event e, final long date) { - final Notification n = new Notification(); - n.setNotificationId("ntf-" + DigestUtils.md5Hex(s.getSubscriptionId() + "@@@" + e.getEventId())); - n.setSubscriptionId(s.getSubscriptionId()); - n.setEventId(e.getEventId()); - n.setProducerId(e.getProducerId()); - n.setTopic(e.getTopic()); - n.setPayload(e.getPayload()); - n.setMap(e.getMap()); - n.setDate(date); - return n; - } - - private static boolean verifyConditions(final MappedFields map, - final Map> conditions) { - if (conditions.containsKey("targetDatasourceName") - && !SubscriptionUtils - .verifyExact(map.getTargetDatasourceName(), conditions.get("targetDatasourceName").get(0).getValue())) { - return false; - } - - if (conditions.containsKey("trust") - && !SubscriptionUtils - .verifyFloatRange( - map.getTrust(), conditions.get("trust").get(0).getValue(), - conditions.get("trust").get(0).getOtherValue())) { - return false; - } - - if (conditions.containsKey("targetDateofacceptance") && !conditions - .get("targetDateofacceptance") - .stream() - .anyMatch( - c -> SubscriptionUtils - .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { - return false; - } - - if (conditions.containsKey("targetResultTitle") - && !conditions - .get("targetResultTitle") - .stream() - .anyMatch(c -> SubscriptionUtils.verifySimilar(map.getTargetResultTitle(), c.getValue()))) { - return false; - } - - if (conditions.containsKey("targetAuthors") - && !conditions - .get("targetAuthors") - .stream() - .allMatch(c -> SubscriptionUtils.verifyListSimilar(map.getTargetAuthors(), c.getValue()))) { - return false; - } - - return !conditions.containsKey("targetSubjects") - || conditions - .get("targetSubjects") - .stream() - .allMatch(c -> SubscriptionUtils.verifyListExact(map.getTargetSubjects(), c.getValue())); - - } - - private static List listSubscriptions(final String brokerApiBaseUrl) throws Exception { - final String url = brokerApiBaseUrl + "/api/subscriptions"; - final HttpGet req = new HttpGet(url); - - final ObjectMapper mapper = new ObjectMapper(); - - try (final CloseableHttpClient client = HttpClients.createDefault()) { - try (final CloseableHttpResponse response = client.execute(req)) { - final String s = IOUtils.toString(response.getEntity().getContent()); - return mapper - .readValue(s, mapper.getTypeFactory().constructCollectionType(List.class, Subscription.class)); - } - } } private static String deleteOldNotifications(final String brokerApiBaseUrl, final long l) throws Exception { diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index ea80c3acf..bc6778f52 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -115,6 +115,11 @@ spark2EventLogDir spark 2.* event log dir location + + sparkMaxExecutorsForIndexing + 8 + Max number of workers for ElasticSearch indexing + @@ -498,7 +503,7 @@ --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.dynamicAllocation.maxExecutors=${sparkMaxExecutorsForIndexing} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -542,6 +547,30 @@ --dbPassword${brokerDbPassword} --brokerApiBaseUrl${brokerApiBaseUrl} + + + + + + + yarn + cluster + GenerateNotificationsJob + eu.dnetlib.dhp.broker.oa.GenerateNotificationsJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --outputDir${outputDir} + --brokerApiBaseUrl${brokerApiBaseUrl} + @@ -556,7 +585,7 @@ --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.dynamicAllocation.maxExecutors=${sparkMaxExecutorsForIndexing} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_notifications.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_notifications.json new file mode 100644 index 000000000..6e12783b9 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_notifications.json @@ -0,0 +1,14 @@ +[ + { + "paramName": "o", + "paramLongName": "outputDir", + "paramDescription": "the dir that contains the events folder", + "paramRequired": true + }, + { + "paramName": "broker", + "paramLongName": "brokerApiBaseUrl", + "paramDescription": "the url of the broker service api", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml index 70ab7d344..0d226d78e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml @@ -98,6 +98,11 @@ spark2EventLogDir spark 2.* event log dir location + + sparkMaxExecutorsForIndexing + 8 + Max number of workers for ElasticSearch indexing + @@ -119,12 +124,36 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + yarn + cluster + GenerateNotificationsJob + eu.dnetlib.dhp.broker.oa.GenerateNotificationsJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --outputDir${outputDir} + --brokerApiBaseUrl${brokerApiBaseUrl} + + + + + yarn @@ -135,6 +164,7 @@ --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} + --conf spark.dynamicAllocation.maxExecutors=${sparkMaxExecutorsForIndexing} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml index 9095004ad..87adfffaa 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml @@ -75,6 +75,11 @@ spark2EventLogDir spark 2.* event log dir location + + sparkMaxExecutorsForIndexing + 8 + Max number of workers for ElasticSearch indexing + @@ -112,7 +117,7 @@ --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.dynamicAllocation.maxExecutors=${sparkMaxExecutorsForIndexing} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java new file mode 100644 index 000000000..8ecb3061b --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java @@ -0,0 +1,133 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import eu.dnetlib.dhp.broker.model.ConditionParams; +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.model.MappedFields; +import eu.dnetlib.dhp.broker.model.Subscription; +import eu.dnetlib.dhp.broker.oa.util.NotificationGroup; + +class IndexNotificationsJobTest { + + private List subscriptions; + + private Map>> conditionsMap; + + private static final int N_TIMES = 1_000_000; + + @BeforeEach + void setUp() throws Exception { + final Subscription s = new Subscription(); + s.setTopic("ENRICH/MISSING/PID"); + s + .setConditions( + "[{\"field\":\"targetDatasourceName\",\"fieldType\":\"STRING\",\"operator\":\"EXACT\",\"listParams\":[{\"value\":\"reposiTUm\"}]},{\"field\":\"trust\",\"fieldType\":\"FLOAT\",\"operator\":\"RANGE\",\"listParams\":[{\"value\":\"0\",\"otherValue\":\"1\"}]}]"); + subscriptions = Arrays.asList(s); + conditionsMap = IndexNotificationsJob.prepareConditionsMap(subscriptions); + } + + @Test + void testGenerateNotifications_invalid_topic() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PROJECT"); + + final NotificationGroup res = IndexNotificationsJob + .generateNotifications(event, subscriptions, conditionsMap, 0); + assertEquals(0, res.getData().size()); + } + + @Test + void testGenerateNotifications_topic_match() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PID"); + event.setMap(new MappedFields()); + event.getMap().setTargetDatasourceName("reposiTUm"); + event.getMap().setTrust(0.8f); + + final NotificationGroup res = IndexNotificationsJob + .generateNotifications(event, subscriptions, conditionsMap, 0); + assertEquals(1, res.getData().size()); + } + + @Test + void testGenerateNotifications_topic_no_match() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PID"); + event.setMap(new MappedFields()); + event.getMap().setTargetDatasourceName("Puma"); + event.getMap().setTrust(0.8f); + + final NotificationGroup res = IndexNotificationsJob + .generateNotifications(event, subscriptions, conditionsMap, 0); + assertEquals(0, res.getData().size()); + } + + @Test + void testGenerateNotifications_invalid_topic_repeated() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PROJECT"); + + // warm up + IndexNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); + + final long start = System.currentTimeMillis(); + for (int i = 0; i < N_TIMES; i++) { + IndexNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); + } + final long end = System.currentTimeMillis(); + System.out + .println(String.format("no topic - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); + + } + + @Test + void testGenerateNotifications_topic_match_repeated() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PID"); + event.setMap(new MappedFields()); + event.getMap().setTargetDatasourceName("reposiTUm"); + event.getMap().setTrust(0.8f); + + // warm up + IndexNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); + + final long start = System.currentTimeMillis(); + for (int i = 0; i < N_TIMES; i++) { + IndexNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); + } + final long end = System.currentTimeMillis(); + System.out + .println(String.format("topic match - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); + } + + @Test + void testGenerateNotifications_topic_no_match_repeated() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PID"); + event.setMap(new MappedFields()); + event.getMap().setTargetDatasourceName("Puma"); + event.getMap().setTrust(0.8f); + + // warm up + IndexNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); + + final long start = System.currentTimeMillis(); + for (int i = 0; i < N_TIMES; i++) { + IndexNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); + } + final long end = System.currentTimeMillis(); + System.out + .println( + String.format("topic no match - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/samples/SimpleVariableJobTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/samples/SimpleVariableJobTest.java new file mode 100644 index 000000000..a6d1c89d3 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/samples/SimpleVariableJobTest.java @@ -0,0 +1,132 @@ + +package eu.dnetlib.dhp.broker.oa.samples; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.broker.model.ConditionParams; +import eu.dnetlib.dhp.broker.model.MapCondition; +import eu.dnetlib.dhp.broker.oa.util.SubscriptionUtils; + +@Disabled +public class SimpleVariableJobTest { + + private static final Logger log = LoggerFactory.getLogger(SimpleVariableJobTest.class); + + private static Path workingDir; + + private static SparkSession spark; + + private final static List inputList = new ArrayList<>(); + + private static final Map>> staticMap = new HashMap<>(); + + @BeforeAll + public static void beforeAll() throws IOException { + + workingDir = Files.createTempDirectory(SimpleVariableJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + final SparkConf conf = new SparkConf(); + conf.setAppName(SimpleVariableJobTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + // conf.set("spark.sql.warehouse.dir", workingDir.toString()); + // conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(SimpleVariableJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + + for (int i = 0; i < 1_000_000; i++) { + inputList.add("record " + i); + } + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void testSimpleVariableJob() throws Exception { + final Map>> map = fillMap(); + + final long n = spark + .createDataset(inputList, Encoders.STRING()) + .filter(s -> filter(map.get(s))) + .map((MapFunction) s -> s.toLowerCase(), Encoders.STRING()) + .count(); + + System.out.println(n); + } + + @Test + public void testSimpleVariableJob_static() throws Exception { + + staticMap.putAll(fillMap()); + + final long n = spark + .createDataset(inputList, Encoders.STRING()) + .filter(s -> filter(staticMap.get(s))) + .map((MapFunction) s -> s.toLowerCase(), Encoders.STRING()) + .count(); + + System.out.println(n); + } + + private static Map>> fillMap() + throws JsonParseException, JsonMappingException, IOException { + final String s = "[{\"field\":\"targetDatasourceName\",\"fieldType\":\"STRING\",\"operator\":\"EXACT\",\"listParams\":[{\"value\":\"reposiTUm\"}]},{\"field\":\"trust\",\"fieldType\":\"FLOAT\",\"operator\":\"RANGE\",\"listParams\":[{\"value\":\"0\",\"otherValue\":\"1\"}]}]"; + + final ObjectMapper mapper = new ObjectMapper(); + final List list = mapper + .readValue(s, mapper.getTypeFactory().constructCollectionType(List.class, MapCondition.class)); + final Map> conditions = list + .stream() + .filter(mc -> !mc.getListParams().isEmpty()) + .collect(Collectors.toMap(MapCondition::getField, MapCondition::getListParams)); + + final Map>> map = new HashMap<>(); + inputList.forEach(i -> map.put(i, conditions)); + return map; + } + + private static boolean filter(final Map> conditions) { + if (conditions.containsKey("targetDatasourceName") + && !SubscriptionUtils + .verifyExact("reposiTUm", conditions.get("targetDatasourceName").get(0).getValue())) { + return false; + } + return true; + } + +}