diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ConditionParams.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ConditionParams.java new file mode 100644 index 000000000..375300c05 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ConditionParams.java @@ -0,0 +1,31 @@ + +package eu.dnetlib.dhp.broker.model; + +import java.io.Serializable; + +public class ConditionParams implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 2719901844537516110L; + + private String value; + private String otherValue; + + public String getValue() { + return value; + } + + public void setValue(final String value) { + this.value = value; + } + + public String getOtherValue() { + return otherValue; + } + + public void setOtherValue(final String otherValue) { + this.otherValue = otherValue; + } +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MapCondition.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MapCondition.java new file mode 100644 index 000000000..069eee2a8 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MapCondition.java @@ -0,0 +1,37 @@ + +package eu.dnetlib.dhp.broker.model; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class MapCondition implements Serializable { + + /** + * + */ + private static final long serialVersionUID = -7137490975452466813L; + + private String field; + private List listParams = new ArrayList<>(); + + public String getField() { + return field; + } + + public void setField(final String field) { + this.field = field; + } + + public List getListParams() { + return listParams; + } + + public void setListParams(final List listParams) { + this.listParams = listParams; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Notification.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Notification.java new file mode 100644 index 000000000..4ef25bf1f --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Notification.java @@ -0,0 +1,93 @@ + +package eu.dnetlib.dhp.broker.model; + +import java.io.Serializable; + +public class Notification implements Serializable { + + /** + * + */ + private static final long serialVersionUID = -1770420972526995727L; + + private String notificationId; + + private String subscriptionId; + + private String producerId; + + private String eventId; + + private String topic; + + private Long date; + + private String payload; + + private MappedFields map; + + public String getNotificationId() { + return notificationId; + } + + public void setNotificationId(final String notificationId) { + this.notificationId = notificationId; + } + + public String getSubscriptionId() { + return subscriptionId; + } + + public void setSubscriptionId(final String subscriptionId) { + this.subscriptionId = subscriptionId; + } + + public String getProducerId() { + return producerId; + } + + public void setProducerId(final String producerId) { + this.producerId = producerId; + } + + public String getEventId() { + return eventId; + } + + public void setEventId(final String eventId) { + this.eventId = eventId; + } + + public String getTopic() { + return topic; + } + + public void setTopic(final String topic) { + this.topic = topic; + } + + public String getPayload() { + return payload; + } + + public void setPayload(final String payload) { + this.payload = payload; + } + + public MappedFields getMap() { + return map; + } + + public void setMap(final MappedFields map) { + this.map = map; + } + + public Long getDate() { + return date; + } + + public void setDate(final Long date) { + this.date = date; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Subscription.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Subscription.java new file mode 100644 index 000000000..6cfd8b0a3 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Subscription.java @@ -0,0 +1,74 @@ + +package eu.dnetlib.dhp.broker.model; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.ObjectMapper; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class Subscription implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1051702214740830010L; + + private String subscriptionId; + + private String subscriber; + + private String topic; + + private String conditions; + + public String getSubscriptionId() { + return subscriptionId; + } + + public void setSubscriptionId(final String subscriptionId) { + this.subscriptionId = subscriptionId; + } + + public String getSubscriber() { + return subscriber; + } + + public void setSubscriber(final String subscriber) { + this.subscriber = subscriber; + } + + public String getTopic() { + return topic; + } + + public void setTopic(final String topic) { + this.topic = topic; + } + + public String getConditions() { + return conditions; + } + + public void setConditions(final String conditions) { + this.conditions = conditions; + } + + public Map> conditionsAsMap() { + final ObjectMapper mapper = new ObjectMapper(); + try { + final List list = mapper + .readValue( + getConditions(), mapper.getTypeFactory().constructCollectionType(List.class, MapCondition.class)); + return list + .stream() + .filter(mc -> !mc.getListParams().isEmpty()) + .collect(Collectors.toMap(MapCondition::getField, MapCondition::getListParams)); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java new file mode 100644 index 000000000..6de00dbee --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -0,0 +1,204 @@ + +package eu.dnetlib.dhp.broker.oa; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.LongAccumulator; +import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.model.ConditionParams; +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.model.MappedFields; +import eu.dnetlib.dhp.broker.model.Notification; +import eu.dnetlib.dhp.broker.model.Subscription; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.NotificationGroup; +import eu.dnetlib.dhp.broker.oa.util.SubscriptionUtils; + +public class IndexNotificationsJob { + + private static final Logger log = LoggerFactory.getLogger(IndexNotificationsJob.class); + + public static void main(final String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + IndexNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); + parser.parseArgument(args); + + final SparkConf conf = new SparkConf(); + + final String eventsPath = parser.get("workingPath") + "/events"; + log.info("eventsPath: {}", eventsPath); + + final String index = parser.get("index"); + log.info("index: {}", index); + + final String indexHost = parser.get("esHost"); + log.info("indexHost: {}", indexHost); + + final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl"); + log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl); + + final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); + + final LongAccumulator total = spark.sparkContext().longAccumulator("total_indexed"); + + final long now = new Date().getTime(); + + final List subscriptions = listSubscriptions(brokerApiBaseUrl); + + log.info("Number of subscriptions: " + subscriptions.size()); + + if (subscriptions.size() > 0) { + final Dataset notifications = ClusterUtils + .readPath(spark, eventsPath, Event.class) + .map(e -> generateNotifications(e, subscriptions, now), Encoders.bean(NotificationGroup.class)) + .flatMap(g -> g.getData().iterator(), Encoders.bean(Notification.class)); + + final JavaRDD inputRdd = notifications + .map(n -> prepareForIndexing(n, total), Encoders.STRING()) + .javaRDD(); + + final Map esCfg = new HashMap<>(); + // esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54"); + + esCfg.put("es.index.auto.create", "false"); + esCfg.put("es.nodes", indexHost); + esCfg.put("es.mapping.id", "notificationId"); // THE PRIMARY KEY + esCfg.put("es.batch.write.retry.count", "8"); + esCfg.put("es.batch.write.retry.wait", "60s"); + esCfg.put("es.batch.size.entries", "200"); + esCfg.put("es.nodes.wan.only", "true"); + + log.info("*** Start indexing"); + JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); + log.info("*** End indexing"); + } + } + + private static NotificationGroup generateNotifications(final Event e, + final List subscriptions, + final long date) { + final List list = subscriptions + .stream() + .filter(s -> s.getTopic().equals(e.getTopic())) + .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) + .map(s -> generateNotification(s, e, date)) + .collect(Collectors.toList()); + + return new NotificationGroup(list); + } + + private static Notification generateNotification(final Subscription s, final Event e, final long date) { + final Notification n = new Notification(); + n.setNotificationId("ntf-" + DigestUtils.md5Hex(s.getSubscriptionId() + "@@@" + e.getEventId())); + n.setSubscriptionId(s.getSubscriptionId()); + n.setEventId(e.getEventId()); + n.setProducerId(e.getProducerId()); + n.setTopic(e.getTopic()); + n.setPayload(e.getPayload()); + n.setMap(e.getMap()); + n.setDate(date); + return n; + } + + private static boolean verifyConditions(final MappedFields map, + final Map> conditions) { + if (conditions.containsKey("targetDatasourceName") + && !SubscriptionUtils + .verifyExact(map.getTargetDatasourceName(), conditions.get("targetDatasourceName").get(0).getValue())) { + return false; + } + + if (conditions.containsKey("trust") + && !SubscriptionUtils + .verifyFloatRange( + map.getTrust(), conditions.get("trust").get(0).getValue(), + conditions.get("trust").get(0).getOtherValue())) { + return false; + } + + if (conditions.containsKey("targetDateofacceptance") && !conditions + .get("targetDateofacceptance") + .stream() + .anyMatch( + c -> SubscriptionUtils + .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { + return false; + } + + if (conditions.containsKey("targetResultTitle") + && !conditions + .get("targetResultTitle") + .stream() + .anyMatch(c -> SubscriptionUtils.verifySimilar(map.getTargetResultTitle(), c.getValue()))) { + return false; + } + + if (conditions.containsKey("targetAuthors") + && !conditions + .get("targetAuthors") + .stream() + .allMatch(c -> SubscriptionUtils.verifyListSimilar(map.getTargetAuthors(), c.getValue()))) { + return false; + } + + if (conditions.containsKey("targetSubjects") + && !conditions + .get("targetSubjects") + .stream() + .allMatch(c -> SubscriptionUtils.verifyListExact(map.getTargetSubjects(), c.getValue()))) { + return false; + } + + return true; + + } + + private static List listSubscriptions(final String brokerApiBaseUrl) throws Exception { + final String url = brokerApiBaseUrl + "/api/subscriptions"; + final HttpGet req = new HttpGet(url); + + final ObjectMapper mapper = new ObjectMapper(); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + final String s = IOUtils.toString(response.getEntity().getContent()); + return mapper + .readValue(s, mapper.getTypeFactory().constructCollectionType(List.class, Subscription.class)); + } + } + + } + + private static String prepareForIndexing(final Notification n, final LongAccumulator acc) + throws JsonProcessingException { + acc.add(1); + return new ObjectMapper().writeValueAsString(n); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/NotificationGroup.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/NotificationGroup.java new file mode 100644 index 000000000..80cf7609b --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/NotificationGroup.java @@ -0,0 +1,44 @@ + +package eu.dnetlib.dhp.broker.oa.util; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import eu.dnetlib.dhp.broker.model.Notification; + +public class NotificationGroup implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 720996471281158977L; + + private List data = new ArrayList<>(); + + public NotificationGroup() { + } + + public NotificationGroup(final List data) { + this.data = data; + } + + public List getData() { + return data; + } + + public void setData(final List data) { + this.data = data; + } + + public NotificationGroup addElement(final Notification elem) { + data.add(elem); + return this; + } + + public NotificationGroup addGroup(final NotificationGroup group) { + data.addAll(group.getData()); + return this; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtils.java new file mode 100644 index 000000000..adb1c753b --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtils.java @@ -0,0 +1,49 @@ + +package eu.dnetlib.dhp.broker.oa.util; + +import java.text.ParseException; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.commons.lang3.time.DateUtils; + +public class SubscriptionUtils { + + private static final long ONE_DAY = 86_400_000; + + public static boolean verifyListSimilar(final List list, final String value) { + return list.stream().anyMatch(s -> verifySimilar(s, value)); + } + + public static boolean verifyListExact(final List list, final String value) { + return list.stream().anyMatch(s -> verifyExact(s, value)); + } + + public static boolean verifySimilar(final String s1, final String s2) { + for (final String part : s2.split("\\W+")) { + if (!StringUtils.containsIgnoreCase(s1, part)) { + return false; + } + } + return true; + } + + public static boolean verifyFloatRange(final float trust, final String min, final String max) { + return trust >= NumberUtils.toFloat(min, 0) && trust <= NumberUtils.toFloat(max, 1); + } + + public static boolean verifyDateRange(final long date, final String min, final String max) { + try { + return date >= DateUtils.parseDate(min, "yyyy-MM-dd").getTime() + && date < DateUtils.parseDate(max, "yyyy-MM-dd").getTime() + ONE_DAY; + } catch (final ParseException e) { + return false; + } + } + + public static boolean verifyExact(final String s1, final String s2) { + return StringUtils.equalsIgnoreCase(s1, s2); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 2e669676b..4184b71bd 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -26,7 +26,11 @@ esEventIndexName - the elasticsearch index name + the elasticsearch index name for events + + + esNotificationsIndexName + the elasticsearch index name for notifications esIndexHost @@ -458,6 +462,32 @@ --maxEventsForTopic${maxIndexedEventsForDsAndTopic} --brokerApiBaseUrl${brokerApiBaseUrl} + + + + + + + yarn + cluster + IndexNotificationsOnESJob + eu.dnetlib.dhp.broker.oa.IndexNotificationsJob + dhp-broker-events-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --workingPath${workingPath} + --index${esNotificationsIndexName} + --esHost${esIndexHost} + --brokerApiBaseUrl${brokerApiBaseUrl} + diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json new file mode 100644 index 000000000..5eea894c8 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "o", + "paramLongName": "workingPath", + "paramDescription": "the workinh path", + "paramRequired": true + }, + { + "paramName": "idx", + "paramLongName": "index", + "paramDescription": "the ES index", + "paramRequired": true + }, + { + "paramName": "es", + "paramLongName": "esHost", + "paramDescription": "the ES host", + "paramRequired": true + }, + { + "paramName": "broker", + "paramLongName": "brokerApiBaseUrl", + "paramDescription": "the url of the broker service api", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index 0b0557693..f629c2101 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -26,7 +26,11 @@ esEventIndexName - the elasticsearch index name + the elasticsearch index name for events + + + esNotificationsIndexName + the elasticsearch index name for notifications esIndexHost @@ -95,18 +99,18 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - IndexEventSubsetOnESJob - eu.dnetlib.dhp.broker.oa.IndexEventSubsetJob + IndexNotificationsOnESJob + eu.dnetlib.dhp.broker.oa.IndexNotificationsJob dhp-broker-events-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -119,14 +123,14 @@ --conf spark.sql.shuffle.partitions=3840 --workingPath${workingPath} - --index${esEventIndexName} + --index${esNotificationsIndexName} --esHost${esIndexHost} - --maxEventsForTopic${maxIndexedEventsForDsAndTopic} --brokerApiBaseUrl${brokerApiBaseUrl} + diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtilsTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtilsTest.java new file mode 100644 index 000000000..b532aa9f7 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtilsTest.java @@ -0,0 +1,52 @@ + +package eu.dnetlib.dhp.broker.oa.util; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Arrays; + +import org.junit.jupiter.api.Test; + +class SubscriptionUtilsTest { + + @Test + void testVerifyListSimilar() { + assertTrue(SubscriptionUtils.verifyListSimilar(Arrays.asList("Michele Artini", "Claudio Atzori"), "artini")); + assertFalse(SubscriptionUtils.verifyListSimilar(Arrays.asList("Michele Artini", "Claudio Atzori"), "bardi")); + } + + @Test + void testVerifyListExact() { + assertTrue(SubscriptionUtils.verifyListExact(Arrays.asList("Java", "Perl"), "perl")); + assertFalse(SubscriptionUtils.verifyListExact(Arrays.asList("Java", "Perl"), "C")); + } + + @Test + void testVerifySimilar() { + assertTrue(SubscriptionUtils.verifySimilar("Java Programming", "java")); + assertFalse(SubscriptionUtils.verifySimilar("Java Programming", "soap")); + } + + @Test + void testVerifyFloatRange() { + assertTrue(SubscriptionUtils.verifyFloatRange(0.5f, "0.4", "0.6")); + assertFalse(SubscriptionUtils.verifyFloatRange(0.8f, "0.4", "0.6")); + assertTrue(SubscriptionUtils.verifyFloatRange(0.5f, "", "")); + } + + @Test + void testVerifyDateRange() { + final long date = 1282738478000l; // 25 August 2010 + + assertTrue(SubscriptionUtils.verifyDateRange(date, "2010-01-01", "2011-01-01")); + assertFalse(SubscriptionUtils.verifyDateRange(date, "2020-01-01", "2021-01-01")); + } + + @Test + void testVerifyExact() { + assertTrue(SubscriptionUtils.verifyExact("Java Programming", "java programming")); + assertFalse(SubscriptionUtils.verifyExact("Java Programming", "soap programming")); + } + +}