diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index 7b65c27ab..c66e5f4f4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -17,7 +17,6 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -25,7 +24,6 @@ import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.apache.spark.util.LongAccumulator; -import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,13 +44,14 @@ public class IndexNotificationsJob { private static final Logger log = LoggerFactory.getLogger(IndexNotificationsJob.class); + private static Map>> conditionsForSubscriptions = new HashMap<>(); + public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - IndexNotificationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); + .toString(IndexNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); @@ -89,6 +88,8 @@ public class IndexNotificationsJob { final List subscriptions = listSubscriptions(brokerApiBaseUrl); + initConditionsForSubscriptions(subscriptions); + log.info("Number of subscriptions: " + subscriptions.size()); if (subscriptions.size() > 0) { @@ -96,9 +97,7 @@ public class IndexNotificationsJob { final Encoder nEncoder = Encoders.bean(Notification.class); final Dataset notifications = ClusterUtils .readPath(spark, eventsPath, Event.class) - .map( - (MapFunction) e -> generateNotifications(e, subscriptions, startTime), - ngEncoder) + .map((MapFunction) e -> generateNotifications(e, subscriptions, startTime), ngEncoder) .flatMap((FlatMapFunction) g -> g.getData().iterator(), nEncoder); notifications @@ -108,20 +107,27 @@ public class IndexNotificationsJob { } } - private static NotificationGroup generateNotifications(final Event e, + protected static void initConditionsForSubscriptions(final List subscriptions) { + subscriptions.forEach(s -> conditionsForSubscriptions.put(s.getSubscriptionId(), s.conditionsAsMap())); + } + + protected static NotificationGroup generateNotifications(final Event e, final List subscriptions, final long date) { final List list = subscriptions .stream() - .filter( - s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) - .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) + .filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) + .filter(s -> verifyConditions(e.getMap(), conditionsAsMap(s))) .map(s -> generateNotification(s, e, date)) .collect(Collectors.toList()); return new NotificationGroup(list); } + private static Map> conditionsAsMap(final Subscription s) { + return conditionsForSubscriptions.get(s.getSubscriptionId()); + } + private static Notification generateNotification(final Subscription s, final Event e, final long date) { final Notification n = new Notification(); n.setNotificationId("ntf-" + DigestUtils.md5Hex(s.getSubscriptionId() + "@@@" + e.getEventId())); @@ -145,18 +151,15 @@ public class IndexNotificationsJob { if (conditions.containsKey("trust") && !SubscriptionUtils - .verifyFloatRange( - map.getTrust(), conditions.get("trust").get(0).getValue(), - conditions.get("trust").get(0).getOtherValue())) { + .verifyFloatRange(map.getTrust(), conditions.get("trust").get(0).getValue(), conditions.get("trust").get(0).getOtherValue())) { return false; } if (conditions.containsKey("targetDateofacceptance") && !conditions .get("targetDateofacceptance") .stream() - .anyMatch( - c -> SubscriptionUtils - .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { + .anyMatch(c -> SubscriptionUtils + .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { return false; } diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java new file mode 100644 index 000000000..ea5dbd5d1 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java @@ -0,0 +1,84 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Arrays; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.model.MappedFields; +import eu.dnetlib.dhp.broker.model.Subscription; +import eu.dnetlib.dhp.broker.oa.util.NotificationGroup; + +class IndexNotificationsJobTest { + + private List subscriptions; + + @BeforeEach + void setUp() throws Exception { + final Subscription s = new Subscription(); + s.setTopic("ENRICH/MISSING/PID"); + s.setConditions("[{\"field\":\"targetDatasourceName\",\"fieldType\":\"STRING\",\"operator\":\"EXACT\",\"listParams\":[{\"value\":\"reposiTUm\"}]},{\"field\":\"trust\",\"fieldType\":\"FLOAT\",\"operator\":\"RANGE\",\"listParams\":[{\"value\":\"0\",\"otherValue\":\"1\"}]}]"); + subscriptions = Arrays.asList(s); + IndexNotificationsJob.initConditionsForSubscriptions(subscriptions); + } + + @Test + void testGenerateNotifications_invalid_topic() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PROJECT"); + + for (int i = 0; i < 10; i++) { + final long start = System.currentTimeMillis(); + final NotificationGroup res = IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + final long end = System.currentTimeMillis(); + + System.out.println("no topic - execution time (ms): " + (end - start)); + + assertEquals(0, res.getData().size()); + } + } + + @Test + void testGenerateNotifications_topic_match() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PID"); + event.setMap(new MappedFields()); + event.getMap().setTargetDatasourceName("reposiTUm"); + event.getMap().setTrust(0.8f); + + for (int i = 0; i < 10; i++) { + final long start = System.currentTimeMillis(); + final NotificationGroup res = IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + final long end = System.currentTimeMillis(); + + System.out.println("topic match - execution time (ms): " + (end - start)); + + assertEquals(1, res.getData().size()); + } + } + + @Test + void testGenerateNotifications_topic_no_match() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PID"); + event.setMap(new MappedFields()); + event.getMap().setTargetDatasourceName("Puma"); + event.getMap().setTrust(0.8f); + + for (int i = 0; i < 10; i++) { + final long start = System.currentTimeMillis(); + final NotificationGroup res = IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + final long end = System.currentTimeMillis(); + + System.out.println("topic no match - execution time (ms): " + (end - start)); + + assertEquals(0, res.getData().size()); + } + } + +}