notification indexing

This commit is contained in:
Michele Artini 2020-08-26 15:10:48 +02:00
parent da470422d3
commit 82ed8edafd
11 changed files with 652 additions and 8 deletions

View File

@ -0,0 +1,31 @@
package eu.dnetlib.dhp.broker.model;
import java.io.Serializable;
public class ConditionParams implements Serializable {
/**
*
*/
private static final long serialVersionUID = 2719901844537516110L;
private String value;
private String otherValue;
public String getValue() {
return value;
}
public void setValue(final String value) {
this.value = value;
}
public String getOtherValue() {
return otherValue;
}
public void setOtherValue(final String otherValue) {
this.otherValue = otherValue;
}
}

View File

@ -0,0 +1,37 @@
package eu.dnetlib.dhp.broker.model;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@JsonIgnoreProperties(ignoreUnknown = true)
public class MapCondition implements Serializable {
/**
*
*/
private static final long serialVersionUID = -7137490975452466813L;
private String field;
private List<ConditionParams> listParams = new ArrayList<>();
public String getField() {
return field;
}
public void setField(final String field) {
this.field = field;
}
public List<ConditionParams> getListParams() {
return listParams;
}
public void setListParams(final List<ConditionParams> listParams) {
this.listParams = listParams;
}
}

View File

@ -0,0 +1,93 @@
package eu.dnetlib.dhp.broker.model;
import java.io.Serializable;
public class Notification implements Serializable {
/**
*
*/
private static final long serialVersionUID = -1770420972526995727L;
private String notificationId;
private String subscriptionId;
private String producerId;
private String eventId;
private String topic;
private Long date;
private String payload;
private MappedFields map;
public String getNotificationId() {
return notificationId;
}
public void setNotificationId(final String notificationId) {
this.notificationId = notificationId;
}
public String getSubscriptionId() {
return subscriptionId;
}
public void setSubscriptionId(final String subscriptionId) {
this.subscriptionId = subscriptionId;
}
public String getProducerId() {
return producerId;
}
public void setProducerId(final String producerId) {
this.producerId = producerId;
}
public String getEventId() {
return eventId;
}
public void setEventId(final String eventId) {
this.eventId = eventId;
}
public String getTopic() {
return topic;
}
public void setTopic(final String topic) {
this.topic = topic;
}
public String getPayload() {
return payload;
}
public void setPayload(final String payload) {
this.payload = payload;
}
public MappedFields getMap() {
return map;
}
public void setMap(final MappedFields map) {
this.map = map;
}
public Long getDate() {
return date;
}
public void setDate(final Long date) {
this.date = date;
}
}

View File

@ -0,0 +1,74 @@
package eu.dnetlib.dhp.broker.model;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.ObjectMapper;
@JsonIgnoreProperties(ignoreUnknown = true)
public class Subscription implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1051702214740830010L;
private String subscriptionId;
private String subscriber;
private String topic;
private String conditions;
public String getSubscriptionId() {
return subscriptionId;
}
public void setSubscriptionId(final String subscriptionId) {
this.subscriptionId = subscriptionId;
}
public String getSubscriber() {
return subscriber;
}
public void setSubscriber(final String subscriber) {
this.subscriber = subscriber;
}
public String getTopic() {
return topic;
}
public void setTopic(final String topic) {
this.topic = topic;
}
public String getConditions() {
return conditions;
}
public void setConditions(final String conditions) {
this.conditions = conditions;
}
public Map<String, List<ConditionParams>> conditionsAsMap() {
final ObjectMapper mapper = new ObjectMapper();
try {
final List<MapCondition> list = mapper
.readValue(
getConditions(), mapper.getTypeFactory().constructCollectionType(List.class, MapCondition.class));
return list
.stream()
.filter(mc -> !mc.getListParams().isEmpty())
.collect(Collectors.toMap(MapCondition::getField, MapCondition::getListParams));
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,204 @@
package eu.dnetlib.dhp.broker.oa;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.ConditionParams;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.model.MappedFields;
import eu.dnetlib.dhp.broker.model.Notification;
import eu.dnetlib.dhp.broker.model.Subscription;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.NotificationGroup;
import eu.dnetlib.dhp.broker.oa.util.SubscriptionUtils;
public class IndexNotificationsJob {
private static final Logger log = LoggerFactory.getLogger(IndexNotificationsJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
IndexNotificationsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json")));
parser.parseArgument(args);
final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events";
log.info("eventsPath: {}", eventsPath);
final String index = parser.get("index");
log.info("index: {}", index);
final String indexHost = parser.get("esHost");
log.info("indexHost: {}", indexHost);
final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl");
log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl);
final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
final LongAccumulator total = spark.sparkContext().longAccumulator("total_indexed");
final long now = new Date().getTime();
final List<Subscription> subscriptions = listSubscriptions(brokerApiBaseUrl);
log.info("Number of subscriptions: " + subscriptions.size());
if (subscriptions.size() > 0) {
final Dataset<Notification> notifications = ClusterUtils
.readPath(spark, eventsPath, Event.class)
.map(e -> generateNotifications(e, subscriptions, now), Encoders.bean(NotificationGroup.class))
.flatMap(g -> g.getData().iterator(), Encoders.bean(Notification.class));
final JavaRDD<String> inputRdd = notifications
.map(n -> prepareForIndexing(n, total), Encoders.STRING())
.javaRDD();
final Map<String, String> esCfg = new HashMap<>();
// esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54");
esCfg.put("es.index.auto.create", "false");
esCfg.put("es.nodes", indexHost);
esCfg.put("es.mapping.id", "notificationId"); // THE PRIMARY KEY
esCfg.put("es.batch.write.retry.count", "8");
esCfg.put("es.batch.write.retry.wait", "60s");
esCfg.put("es.batch.size.entries", "200");
esCfg.put("es.nodes.wan.only", "true");
log.info("*** Start indexing");
JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg);
log.info("*** End indexing");
}
}
private static NotificationGroup generateNotifications(final Event e,
final List<Subscription> subscriptions,
final long date) {
final List<Notification> list = subscriptions
.stream()
.filter(s -> s.getTopic().equals(e.getTopic()))
.filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap()))
.map(s -> generateNotification(s, e, date))
.collect(Collectors.toList());
return new NotificationGroup(list);
}
private static Notification generateNotification(final Subscription s, final Event e, final long date) {
final Notification n = new Notification();
n.setNotificationId("ntf-" + DigestUtils.md5Hex(s.getSubscriptionId() + "@@@" + e.getEventId()));
n.setSubscriptionId(s.getSubscriptionId());
n.setEventId(e.getEventId());
n.setProducerId(e.getProducerId());
n.setTopic(e.getTopic());
n.setPayload(e.getPayload());
n.setMap(e.getMap());
n.setDate(date);
return n;
}
private static boolean verifyConditions(final MappedFields map,
final Map<String, List<ConditionParams>> conditions) {
if (conditions.containsKey("targetDatasourceName")
&& !SubscriptionUtils
.verifyExact(map.getTargetDatasourceName(), conditions.get("targetDatasourceName").get(0).getValue())) {
return false;
}
if (conditions.containsKey("trust")
&& !SubscriptionUtils
.verifyFloatRange(
map.getTrust(), conditions.get("trust").get(0).getValue(),
conditions.get("trust").get(0).getOtherValue())) {
return false;
}
if (conditions.containsKey("targetDateofacceptance") && !conditions
.get("targetDateofacceptance")
.stream()
.anyMatch(
c -> SubscriptionUtils
.verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) {
return false;
}
if (conditions.containsKey("targetResultTitle")
&& !conditions
.get("targetResultTitle")
.stream()
.anyMatch(c -> SubscriptionUtils.verifySimilar(map.getTargetResultTitle(), c.getValue()))) {
return false;
}
if (conditions.containsKey("targetAuthors")
&& !conditions
.get("targetAuthors")
.stream()
.allMatch(c -> SubscriptionUtils.verifyListSimilar(map.getTargetAuthors(), c.getValue()))) {
return false;
}
if (conditions.containsKey("targetSubjects")
&& !conditions
.get("targetSubjects")
.stream()
.allMatch(c -> SubscriptionUtils.verifyListExact(map.getTargetSubjects(), c.getValue()))) {
return false;
}
return true;
}
private static List<Subscription> listSubscriptions(final String brokerApiBaseUrl) throws Exception {
final String url = brokerApiBaseUrl + "/api/subscriptions";
final HttpGet req = new HttpGet(url);
final ObjectMapper mapper = new ObjectMapper();
try (final CloseableHttpClient client = HttpClients.createDefault()) {
try (final CloseableHttpResponse response = client.execute(req)) {
final String s = IOUtils.toString(response.getEntity().getContent());
return mapper
.readValue(s, mapper.getTypeFactory().constructCollectionType(List.class, Subscription.class));
}
}
}
private static String prepareForIndexing(final Notification n, final LongAccumulator acc)
throws JsonProcessingException {
acc.add(1);
return new ObjectMapper().writeValueAsString(n);
}
}

View File

@ -0,0 +1,44 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import eu.dnetlib.dhp.broker.model.Notification;
public class NotificationGroup implements Serializable {
/**
*
*/
private static final long serialVersionUID = 720996471281158977L;
private List<Notification> data = new ArrayList<>();
public NotificationGroup() {
}
public NotificationGroup(final List<Notification> data) {
this.data = data;
}
public List<Notification> getData() {
return data;
}
public void setData(final List<Notification> data) {
this.data = data;
}
public NotificationGroup addElement(final Notification elem) {
data.add(elem);
return this;
}
public NotificationGroup addGroup(final NotificationGroup group) {
data.addAll(group.getData());
return this;
}
}

View File

@ -0,0 +1,49 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.text.ParseException;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.lang3.time.DateUtils;
public class SubscriptionUtils {
private static final long ONE_DAY = 86_400_000;
public static boolean verifyListSimilar(final List<String> list, final String value) {
return list.stream().anyMatch(s -> verifySimilar(s, value));
}
public static boolean verifyListExact(final List<String> list, final String value) {
return list.stream().anyMatch(s -> verifyExact(s, value));
}
public static boolean verifySimilar(final String s1, final String s2) {
for (final String part : s2.split("\\W+")) {
if (!StringUtils.containsIgnoreCase(s1, part)) {
return false;
}
}
return true;
}
public static boolean verifyFloatRange(final float trust, final String min, final String max) {
return trust >= NumberUtils.toFloat(min, 0) && trust <= NumberUtils.toFloat(max, 1);
}
public static boolean verifyDateRange(final long date, final String min, final String max) {
try {
return date >= DateUtils.parseDate(min, "yyyy-MM-dd").getTime()
&& date < DateUtils.parseDate(max, "yyyy-MM-dd").getTime() + ONE_DAY;
} catch (final ParseException e) {
return false;
}
}
public static boolean verifyExact(final String s1, final String s2) {
return StringUtils.equalsIgnoreCase(s1, s2);
}
}

View File

@ -26,7 +26,11 @@
</property>
<property>
<name>esEventIndexName</name>
<description>the elasticsearch index name</description>
<description>the elasticsearch index name for events</description>
</property>
<property>
<name>esNotificationsIndexName</name>
<description>the elasticsearch index name for notifications</description>
</property>
<property>
<name>esIndexHost</name>
@ -458,6 +462,32 @@
<arg>--maxEventsForTopic</arg><arg>${maxIndexedEventsForDsAndTopic}</arg>
<arg>--brokerApiBaseUrl</arg><arg>${brokerApiBaseUrl}</arg>
</spark>
<ok to="index_notifications"/>
<error to="Kill"/>
</action>
<action name="index_notifications">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>IndexNotificationsOnESJob</name>
<class>eu.dnetlib.dhp.broker.oa.IndexNotificationsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.dynamicAllocation.maxExecutors="8"
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--index</arg><arg>${esNotificationsIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg>
<arg>--brokerApiBaseUrl</arg><arg>${brokerApiBaseUrl}</arg>
</spark>
<ok to="stats"/>
<error to="Kill"/>
</action>

View File

@ -0,0 +1,26 @@
[
{
"paramName": "o",
"paramLongName": "workingPath",
"paramDescription": "the workinh path",
"paramRequired": true
},
{
"paramName": "idx",
"paramLongName": "index",
"paramDescription": "the ES index",
"paramRequired": true
},
{
"paramName": "es",
"paramLongName": "esHost",
"paramDescription": "the ES host",
"paramRequired": true
},
{
"paramName": "broker",
"paramLongName": "brokerApiBaseUrl",
"paramDescription": "the url of the broker service api",
"paramRequired": true
}
]

View File

@ -26,7 +26,11 @@
</property>
<property>
<name>esEventIndexName</name>
<description>the elasticsearch index name</description>
<description>the elasticsearch index name for events</description>
</property>
<property>
<name>esNotificationsIndexName</name>
<description>the elasticsearch index name for notifications</description>
</property>
<property>
<name>esIndexHost</name>
@ -95,18 +99,18 @@
</configuration>
</global>
<start to="index_event_subset"/>
<start to="index_notifications"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="index_event_subset">
<action name="index_notifications">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>IndexEventSubsetOnESJob</name>
<class>eu.dnetlib.dhp.broker.oa.IndexEventSubsetJob</class>
<name>IndexNotificationsOnESJob</name>
<class>eu.dnetlib.dhp.broker.oa.IndexNotificationsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
@ -119,15 +123,15 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--index</arg><arg>${esEventIndexName}</arg>
<arg>--index</arg><arg>${esNotificationsIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg>
<arg>--maxEventsForTopic</arg><arg>${maxIndexedEventsForDsAndTopic}</arg>
<arg>--brokerApiBaseUrl</arg><arg>${brokerApiBaseUrl}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,52 @@
package eu.dnetlib.dhp.broker.oa.util;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Arrays;
import org.junit.jupiter.api.Test;
class SubscriptionUtilsTest {
@Test
void testVerifyListSimilar() {
assertTrue(SubscriptionUtils.verifyListSimilar(Arrays.asList("Michele Artini", "Claudio Atzori"), "artini"));
assertFalse(SubscriptionUtils.verifyListSimilar(Arrays.asList("Michele Artini", "Claudio Atzori"), "bardi"));
}
@Test
void testVerifyListExact() {
assertTrue(SubscriptionUtils.verifyListExact(Arrays.asList("Java", "Perl"), "perl"));
assertFalse(SubscriptionUtils.verifyListExact(Arrays.asList("Java", "Perl"), "C"));
}
@Test
void testVerifySimilar() {
assertTrue(SubscriptionUtils.verifySimilar("Java Programming", "java"));
assertFalse(SubscriptionUtils.verifySimilar("Java Programming", "soap"));
}
@Test
void testVerifyFloatRange() {
assertTrue(SubscriptionUtils.verifyFloatRange(0.5f, "0.4", "0.6"));
assertFalse(SubscriptionUtils.verifyFloatRange(0.8f, "0.4", "0.6"));
assertTrue(SubscriptionUtils.verifyFloatRange(0.5f, "", ""));
}
@Test
void testVerifyDateRange() {
final long date = 1282738478000l; // 25 August 2010
assertTrue(SubscriptionUtils.verifyDateRange(date, "2010-01-01", "2011-01-01"));
assertFalse(SubscriptionUtils.verifyDateRange(date, "2020-01-01", "2021-01-01"));
}
@Test
void testVerifyExact() {
assertTrue(SubscriptionUtils.verifyExact("Java Programming", "java programming"));
assertFalse(SubscriptionUtils.verifyExact("Java Programming", "soap programming"));
}
}