diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index c66e5f4f4..5dc3315c4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -44,14 +44,13 @@ public class IndexNotificationsJob { private static final Logger log = LoggerFactory.getLogger(IndexNotificationsJob.class); - private static Map>> conditionsForSubscriptions = new HashMap<>(); - public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(IndexNotificationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); + .toString( + IndexNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); @@ -88,16 +87,19 @@ public class IndexNotificationsJob { final List subscriptions = listSubscriptions(brokerApiBaseUrl); - initConditionsForSubscriptions(subscriptions); - log.info("Number of subscriptions: " + subscriptions.size()); if (subscriptions.size() > 0) { + final Map>> conditionsMap = prepareConditionsMap(subscriptions); + final Encoder ngEncoder = Encoders.bean(NotificationGroup.class); final Encoder nEncoder = Encoders.bean(Notification.class); final Dataset notifications = ClusterUtils .readPath(spark, eventsPath, Event.class) - .map((MapFunction) e -> generateNotifications(e, subscriptions, startTime), ngEncoder) + .map( + (MapFunction) e -> generateNotifications( + e, subscriptions, conditionsMap, startTime), + ngEncoder) .flatMap((FlatMapFunction) g -> g.getData().iterator(), nEncoder); notifications @@ -107,27 +109,28 @@ public class IndexNotificationsJob { } } - protected static void initConditionsForSubscriptions(final List subscriptions) { - subscriptions.forEach(s -> conditionsForSubscriptions.put(s.getSubscriptionId(), s.conditionsAsMap())); + protected static Map>> prepareConditionsMap( + final List subscriptions) { + final Map>> map = new HashMap<>(); + subscriptions.forEach(s -> map.put(s.getSubscriptionId(), s.conditionsAsMap())); + return map; } protected static NotificationGroup generateNotifications(final Event e, final List subscriptions, + final Map>> conditionsMap, final long date) { final List list = subscriptions .stream() - .filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) - .filter(s -> verifyConditions(e.getMap(), conditionsAsMap(s))) + .filter( + s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) + .filter(s -> verifyConditions(e.getMap(), conditionsMap.get(s.getSubscriptionId()))) .map(s -> generateNotification(s, e, date)) .collect(Collectors.toList()); return new NotificationGroup(list); } - private static Map> conditionsAsMap(final Subscription s) { - return conditionsForSubscriptions.get(s.getSubscriptionId()); - } - private static Notification generateNotification(final Subscription s, final Event e, final long date) { final Notification n = new Notification(); n.setNotificationId("ntf-" + DigestUtils.md5Hex(s.getSubscriptionId() + "@@@" + e.getEventId())); @@ -151,15 +154,18 @@ public class IndexNotificationsJob { if (conditions.containsKey("trust") && !SubscriptionUtils - .verifyFloatRange(map.getTrust(), conditions.get("trust").get(0).getValue(), conditions.get("trust").get(0).getOtherValue())) { + .verifyFloatRange( + map.getTrust(), conditions.get("trust").get(0).getValue(), + conditions.get("trust").get(0).getOtherValue())) { return false; } if (conditions.containsKey("targetDateofacceptance") && !conditions .get("targetDateofacceptance") .stream() - .anyMatch(c -> SubscriptionUtils - .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { + .anyMatch( + c -> SubscriptionUtils + .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { return false; } diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java index 6010da2c3..8ecb3061b 100644 --- a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJobTest.java @@ -5,10 +5,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.Arrays; import java.util.List; +import java.util.Map; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import eu.dnetlib.dhp.broker.model.ConditionParams; import eu.dnetlib.dhp.broker.model.Event; import eu.dnetlib.dhp.broker.model.MappedFields; import eu.dnetlib.dhp.broker.model.Subscription; @@ -18,15 +20,19 @@ class IndexNotificationsJobTest { private List subscriptions; + private Map>> conditionsMap; + private static final int N_TIMES = 1_000_000; @BeforeEach void setUp() throws Exception { final Subscription s = new Subscription(); s.setTopic("ENRICH/MISSING/PID"); - s.setConditions("[{\"field\":\"targetDatasourceName\",\"fieldType\":\"STRING\",\"operator\":\"EXACT\",\"listParams\":[{\"value\":\"reposiTUm\"}]},{\"field\":\"trust\",\"fieldType\":\"FLOAT\",\"operator\":\"RANGE\",\"listParams\":[{\"value\":\"0\",\"otherValue\":\"1\"}]}]"); + s + .setConditions( + "[{\"field\":\"targetDatasourceName\",\"fieldType\":\"STRING\",\"operator\":\"EXACT\",\"listParams\":[{\"value\":\"reposiTUm\"}]},{\"field\":\"trust\",\"fieldType\":\"FLOAT\",\"operator\":\"RANGE\",\"listParams\":[{\"value\":\"0\",\"otherValue\":\"1\"}]}]"); subscriptions = Arrays.asList(s); - IndexNotificationsJob.initConditionsForSubscriptions(subscriptions); + conditionsMap = IndexNotificationsJob.prepareConditionsMap(subscriptions); } @Test @@ -34,7 +40,8 @@ class IndexNotificationsJobTest { final Event event = new Event(); event.setTopic("ENRICH/MISSING/PROJECT"); - final NotificationGroup res = IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + final NotificationGroup res = IndexNotificationsJob + .generateNotifications(event, subscriptions, conditionsMap, 0); assertEquals(0, res.getData().size()); } @@ -46,7 +53,8 @@ class IndexNotificationsJobTest { event.getMap().setTargetDatasourceName("reposiTUm"); event.getMap().setTrust(0.8f); - final NotificationGroup res = IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + final NotificationGroup res = IndexNotificationsJob + .generateNotifications(event, subscriptions, conditionsMap, 0); assertEquals(1, res.getData().size()); } @@ -58,7 +66,8 @@ class IndexNotificationsJobTest { event.getMap().setTargetDatasourceName("Puma"); event.getMap().setTrust(0.8f); - final NotificationGroup res = IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + final NotificationGroup res = IndexNotificationsJob + .generateNotifications(event, subscriptions, conditionsMap, 0); assertEquals(0, res.getData().size()); } @@ -68,14 +77,15 @@ class IndexNotificationsJobTest { event.setTopic("ENRICH/MISSING/PROJECT"); // warm up - IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + IndexNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); final long start = System.currentTimeMillis(); for (int i = 0; i < N_TIMES; i++) { - IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + IndexNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); } final long end = System.currentTimeMillis(); - System.out.println(String.format("no topic - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); + System.out + .println(String.format("no topic - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); } @@ -88,14 +98,15 @@ class IndexNotificationsJobTest { event.getMap().setTrust(0.8f); // warm up - IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + IndexNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); final long start = System.currentTimeMillis(); for (int i = 0; i < N_TIMES; i++) { - IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + IndexNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); } final long end = System.currentTimeMillis(); - System.out.println(String.format("topic match - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); + System.out + .println(String.format("topic match - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); } @Test @@ -107,14 +118,16 @@ class IndexNotificationsJobTest { event.getMap().setTrust(0.8f); // warm up - IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + IndexNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); final long start = System.currentTimeMillis(); for (int i = 0; i < N_TIMES; i++) { - IndexNotificationsJob.generateNotifications(event, subscriptions, 0); + IndexNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); } final long end = System.currentTimeMillis(); - System.out.println(String.format("topic no match - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); + System.out + .println( + String.format("topic no match - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); } }