diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index e8ef5dd3e9..fb2e26ba2a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -17,7 +17,6 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -25,7 +24,6 @@ import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.apache.spark.util.LongAccumulator; -import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +31,11 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.broker.model.*; +import eu.dnetlib.dhp.broker.model.ConditionParams; +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.model.MappedFields; +import eu.dnetlib.dhp.broker.model.Notification; +import eu.dnetlib.dhp.broker.model.Subscription; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.NotificationGroup; import eu.dnetlib.dhp.broker.oa.util.SubscriptionUtils; @@ -85,55 +87,46 @@ public class IndexNotificationsJob { final List subscriptions = listSubscriptions(brokerApiBaseUrl); - log.info("Number of subscriptions: {}", subscriptions.size()); + log.info("Number of subscriptions: " + subscriptions.size()); + + if (subscriptions.size() > 0) { + final Map>> conditionsMap = prepareConditionsMap(subscriptions); + + log.info("ConditionsMap: " + new ObjectMapper().writeValueAsString(conditionsMap)); - if (!subscriptions.isEmpty()) { final Encoder ngEncoder = Encoders.bean(NotificationGroup.class); final Encoder nEncoder = Encoders.bean(Notification.class); final Dataset notifications = ClusterUtils .readPath(spark, eventsPath, Event.class) .map( - (MapFunction) e -> generateNotifications(e, subscriptions, startTime), + (MapFunction) e -> generateNotifications( + e, subscriptions, conditionsMap, startTime), ngEncoder) .flatMap((FlatMapFunction) g -> g.getData().iterator(), nEncoder); - final JavaRDD inputRdd = notifications + notifications .map((MapFunction) n -> prepareForIndexing(n, total), Encoders.STRING()) - .javaRDD(); - - final Map esCfg = new HashMap<>(); - - esCfg.put("es.index.auto.create", "false"); - esCfg.put("es.nodes", indexHost); - esCfg.put("es.mapping.id", "notificationId"); // THE PRIMARY KEY - esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount); - esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait); - esCfg.put("es.batch.size.entries", esBatchSizeEntries); - esCfg.put("es.nodes.wan.only", esNodesWanOnly); - - log.info("*** Start indexing"); - JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); - log.info("*** End indexing"); - - log.info("*** Deleting old notifications"); - final String message = deleteOldNotifications(brokerApiBaseUrl, startTime - 1000); - log.info("*** Deleted notifications: {}", message); - - log.info("*** sendNotifications (emails, ...)"); - sendNotifications(brokerApiBaseUrl, startTime - 1000); - log.info("*** ALL done."); - + .javaRDD() + .saveAsTextFile("/tmp/IndexNotificationsJob_test_6504"); } } - private static NotificationGroup generateNotifications(final Event e, + protected static Map>> prepareConditionsMap( + final List subscriptions) { + final Map>> map = new HashMap<>(); + subscriptions.forEach(s -> map.put(s.getSubscriptionId(), s.conditionsAsMap())); + return map; + } + + protected static NotificationGroup generateNotifications(final Event e, final List subscriptions, + final Map>> conditionsMap, final long date) { final List list = subscriptions .stream() .filter( s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) - .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) + .filter(s -> verifyConditions(e.getMap(), conditionsMap.get(s.getSubscriptionId()))) .map(s -> generateNotification(s, e, date)) .collect(Collectors.toList()); @@ -169,28 +162,28 @@ public class IndexNotificationsJob { return false; } - if (conditions.containsKey("targetDateofacceptance") && conditions + if (conditions.containsKey("targetDateofacceptance") && !conditions .get("targetDateofacceptance") .stream() - .noneMatch( + .anyMatch( c -> SubscriptionUtils .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { return false; } if (conditions.containsKey("targetResultTitle") - && conditions + && !conditions .get("targetResultTitle") .stream() - .noneMatch(c -> SubscriptionUtils.verifySimilar(map.getTargetResultTitle(), c.getValue()))) { + .anyMatch(c -> SubscriptionUtils.verifySimilar(map.getTargetResultTitle(), c.getValue()))) { return false; } if (conditions.containsKey("targetAuthors") - && conditions + && !conditions .get("targetAuthors") .stream() - .noneMatch(c -> SubscriptionUtils.verifyListSimilar(map.getTargetAuthors(), c.getValue()))) { + .allMatch(c -> SubscriptionUtils.verifyListSimilar(map.getTargetAuthors(), c.getValue()))) { return false; } @@ -202,7 +195,7 @@ public class IndexNotificationsJob { } - private static List listSubscriptions(final String brokerApiBaseUrl) throws IOException { + private static List listSubscriptions(final String brokerApiBaseUrl) throws Exception { final String url = brokerApiBaseUrl + "/api/subscriptions"; final HttpGet req = new HttpGet(url); @@ -217,7 +210,7 @@ public class IndexNotificationsJob { } } - private static String deleteOldNotifications(final String brokerApiBaseUrl, final long l) throws IOException { + private static String deleteOldNotifications(final String brokerApiBaseUrl, final long l) throws Exception { final String url = brokerApiBaseUrl + "/api/notifications/byDate/0/" + l; final HttpDelete req = new HttpDelete(url); diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml index 248326d57e..70ab7d3443 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml @@ -135,7 +135,6 @@ --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --conf spark.dynamicAllocation.maxExecutors="8" --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}