diff --git a/dhp-build/dhp-code-style/src/main/resources/eclipse/formatter_dnet.xml b/dhp-build/dhp-code-style/src/main/resources/eclipse/formatter_dnet.xml
index fcba2c4b2..e4d85bf39 100644
--- a/dhp-build/dhp-code-style/src/main/resources/eclipse/formatter_dnet.xml
+++ b/dhp-build/dhp-code-style/src/main/resources/eclipse/formatter_dnet.xml
@@ -19,7 +19,7 @@
-
+
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/Instance.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/Instance.java
index edc6f28f5..36013837e 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/Instance.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/Instance.java
@@ -24,8 +24,10 @@ public class Instance implements Serializable {
private String type;
+
private List url;
+
private String publicationdate;// dateofacceptance;
private String refereed; // peer-review status
diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml
index f98708c64..75cc0ea09 100644
--- a/dhp-workflows/dhp-broker-events/pom.xml
+++ b/dhp-workflows/dhp-broker-events/pom.xml
@@ -31,6 +31,10 @@
elasticsearch-hadoop
+
+ org.apache.httpcomponents
+ httpclient
+
eu.dnetlib.dhp
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ConditionParams.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ConditionParams.java
new file mode 100644
index 000000000..375300c05
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ConditionParams.java
@@ -0,0 +1,31 @@
+
+package eu.dnetlib.dhp.broker.model;
+
+import java.io.Serializable;
+
+public class ConditionParams implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 2719901844537516110L;
+
+ private String value;
+ private String otherValue;
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(final String value) {
+ this.value = value;
+ }
+
+ public String getOtherValue() {
+ return otherValue;
+ }
+
+ public void setOtherValue(final String otherValue) {
+ this.otherValue = otherValue;
+ }
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
index 0cb0d7801..429eb7d11 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
@@ -2,7 +2,6 @@
package eu.dnetlib.dhp.broker.model;
import java.text.ParseException;
-import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
@@ -19,16 +18,12 @@ public class EventFactory {
private final static String PRODUCER_ID = "OpenAIRE";
- private static final int TTH_DAYS = 365;
-
private final static String[] DATE_PATTERNS = {
"yyyy-MM-dd"
};
public static Event newBrokerEvent(final UpdateInfo> updateInfo) {
- final long now = new Date().getTime();
-
final Event res = new Event();
final MappedFields map = createMapFromResult(updateInfo);
@@ -44,8 +39,8 @@ public class EventFactory {
res.setPayload(updateInfo.asBrokerPayload().toJSON());
res.setMap(map);
res.setTopic(updateInfo.getTopicPath());
- res.setCreationDate(now);
- res.setExpiryDate(calculateExpiryDate(now));
+ res.setCreationDate(0l);
+ res.setExpiryDate(Long.MAX_VALUE);
res.setInstantMessage(false);
return res;
@@ -96,7 +91,9 @@ public class EventFactory {
return map;
}
- private static String calculateEventId(final String topic, final String dsId, final String publicationId,
+ private static String calculateEventId(final String topic,
+ final String dsId,
+ final String publicationId,
final String value) {
return "event-"
+ DigestUtils.md5Hex(topic).substring(0, 4) + "-"
@@ -105,10 +102,6 @@ public class EventFactory {
+ DigestUtils.md5Hex(value).substring(0, 5);
}
- private static long calculateExpiryDate(final long now) {
- return now + TTH_DAYS * 24 * 60 * 60 * 1000;
- }
-
private static long parseDateTolong(final String date) {
if (StringUtils.isBlank(date)) {
return -1;
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MapCondition.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MapCondition.java
new file mode 100644
index 000000000..069eee2a8
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MapCondition.java
@@ -0,0 +1,37 @@
+
+package eu.dnetlib.dhp.broker.model;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class MapCondition implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -7137490975452466813L;
+
+ private String field;
+ private List listParams = new ArrayList<>();
+
+ public String getField() {
+ return field;
+ }
+
+ public void setField(final String field) {
+ this.field = field;
+ }
+
+ public List getListParams() {
+ return listParams;
+ }
+
+ public void setListParams(final List listParams) {
+ this.listParams = listParams;
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Notification.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Notification.java
new file mode 100644
index 000000000..4ef25bf1f
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Notification.java
@@ -0,0 +1,93 @@
+
+package eu.dnetlib.dhp.broker.model;
+
+import java.io.Serializable;
+
+public class Notification implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -1770420972526995727L;
+
+ private String notificationId;
+
+ private String subscriptionId;
+
+ private String producerId;
+
+ private String eventId;
+
+ private String topic;
+
+ private Long date;
+
+ private String payload;
+
+ private MappedFields map;
+
+ public String getNotificationId() {
+ return notificationId;
+ }
+
+ public void setNotificationId(final String notificationId) {
+ this.notificationId = notificationId;
+ }
+
+ public String getSubscriptionId() {
+ return subscriptionId;
+ }
+
+ public void setSubscriptionId(final String subscriptionId) {
+ this.subscriptionId = subscriptionId;
+ }
+
+ public String getProducerId() {
+ return producerId;
+ }
+
+ public void setProducerId(final String producerId) {
+ this.producerId = producerId;
+ }
+
+ public String getEventId() {
+ return eventId;
+ }
+
+ public void setEventId(final String eventId) {
+ this.eventId = eventId;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(final String topic) {
+ this.topic = topic;
+ }
+
+ public String getPayload() {
+ return payload;
+ }
+
+ public void setPayload(final String payload) {
+ this.payload = payload;
+ }
+
+ public MappedFields getMap() {
+ return map;
+ }
+
+ public void setMap(final MappedFields map) {
+ this.map = map;
+ }
+
+ public Long getDate() {
+ return date;
+ }
+
+ public void setDate(final Long date) {
+ this.date = date;
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Subscription.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Subscription.java
new file mode 100644
index 000000000..6cfd8b0a3
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Subscription.java
@@ -0,0 +1,74 @@
+
+package eu.dnetlib.dhp.broker.model;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Subscription implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1051702214740830010L;
+
+ private String subscriptionId;
+
+ private String subscriber;
+
+ private String topic;
+
+ private String conditions;
+
+ public String getSubscriptionId() {
+ return subscriptionId;
+ }
+
+ public void setSubscriptionId(final String subscriptionId) {
+ this.subscriptionId = subscriptionId;
+ }
+
+ public String getSubscriber() {
+ return subscriber;
+ }
+
+ public void setSubscriber(final String subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(final String topic) {
+ this.topic = topic;
+ }
+
+ public String getConditions() {
+ return conditions;
+ }
+
+ public void setConditions(final String conditions) {
+ this.conditions = conditions;
+ }
+
+ public Map> conditionsAsMap() {
+ final ObjectMapper mapper = new ObjectMapper();
+ try {
+ final List list = mapper
+ .readValue(
+ getConditions(), mapper.getTypeFactory().constructCollectionType(List.class, MapCondition.class));
+ return list
+ .stream()
+ .filter(mc -> !mc.getListParams().isEmpty())
+ .collect(Collectors.toMap(MapCondition::getField, MapCondition::getListParams));
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java
index a51601cd7..8a9009f32 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java
@@ -3,11 +3,16 @@ package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+import java.io.IOException;
import java.util.Optional;
+import java.util.Properties;
import org.apache.commons.io.IOUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
-import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.TypedColumn;
import org.slf4j.Logger;
@@ -28,8 +33,8 @@ public class GenerateStatsJob {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
- IndexOnESJob.class
- .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
+ GenerateStatsJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/stats_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
@@ -43,21 +48,50 @@ public class GenerateStatsJob {
final String eventsPath = parser.get("workingPath") + "/events";
log.info("eventsPath: {}", eventsPath);
- final String statsPath = parser.get("workingPath") + "/stats";
- log.info("stats: {}", statsPath);
+ final String dbUrl = parser.get("dbUrl");
+ log.info("dbUrl: {}", dbUrl);
+
+ final String dbUser = parser.get("dbUser");
+ log.info("dbUser: {}", dbUser);
+
+ final String dbPassword = parser.get("dbPassword");
+ log.info("dbPassword: {}", "***");
+
+ final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl");
+ log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl);
final TypedColumn aggr = new StatsAggregator().toColumn();
+ final Properties connectionProperties = new Properties();
+ connectionProperties.put("user", dbUser);
+ connectionProperties.put("password", dbPassword);
+
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
- final Dataset stats = ClusterUtils
+ ClusterUtils
.readPath(spark, eventsPath, Event.class)
- .groupByKey(e -> e.getMap().getTargetDatasourceId(), Encoders.STRING())
+ .groupByKey(e -> e.getTopic() + "@@@" + e.getMap().getTargetDatasourceId(), Encoders.STRING())
.agg(aggr)
- .map(t -> t._2, Encoders.bean(DatasourceStats.class));
+ .map(t -> t._2, Encoders.bean(DatasourceStats.class))
+ .write()
+ .jdbc(dbUrl, "oa_datasource_stats_temp", connectionProperties);
+
+ log.info("*** updateStats");
+ updateStats(brokerApiBaseUrl);
+ log.info("*** ALL done.");
- ClusterUtils.save(stats, statsPath, DatasourceStats.class, null);
});
}
+ private static String updateStats(final String brokerApiBaseUrl) throws IOException {
+ final String url = brokerApiBaseUrl + "/api/openaireBroker/stats/update";
+ final HttpGet req = new HttpGet(url);
+
+ try (final CloseableHttpClient client = HttpClients.createDefault()) {
+ try (final CloseableHttpResponse response = client.execute(req)) {
+ return IOUtils.toString(response.getEntity().getContent());
+ }
+ }
+ }
+
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java
new file mode 100644
index 000000000..d3cbe0034
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java
@@ -0,0 +1,126 @@
+
+package eu.dnetlib.dhp.broker.oa;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.TypedColumn;
+import org.apache.spark.util.LongAccumulator;
+import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.broker.model.Event;
+import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
+import eu.dnetlib.dhp.broker.oa.util.EventGroup;
+import eu.dnetlib.dhp.broker.oa.util.aggregators.subset.EventSubsetAggregator;
+
+public class IndexEventSubsetJob {
+
+ private static final Logger log = LoggerFactory.getLogger(IndexEventSubsetJob.class);
+
+ public static void main(final String[] args) throws Exception {
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ IndexEventSubsetJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_event_subset.json")));
+ parser.parseArgument(args);
+
+ final SparkConf conf = new SparkConf();
+
+ final String eventsPath = parser.get("workingPath") + "/events";
+ log.info("eventsPath: {}", eventsPath);
+
+ final String index = parser.get("index");
+ log.info("index: {}", index);
+
+ final String indexHost = parser.get("esHost");
+ log.info("indexHost: {}", indexHost);
+
+ final int maxEventsForTopic = NumberUtils.toInt(parser.get("maxEventsForTopic"));
+ log.info("maxEventsForTopic: {}", maxEventsForTopic);
+
+ final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl");
+ log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl);
+
+ final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
+
+ final TypedColumn aggr = new EventSubsetAggregator(maxEventsForTopic).toColumn();
+
+ final LongAccumulator total = spark.sparkContext().longAccumulator("total_indexed");
+
+ final long now = new Date().getTime();
+
+ final Dataset subset = ClusterUtils
+ .readPath(spark, eventsPath, Event.class)
+ .groupByKey(e -> e.getTopic() + '@' + e.getMap().getTargetDatasourceId(), Encoders.STRING())
+ .agg(aggr)
+ .map(t -> t._2, Encoders.bean(EventGroup.class))
+ .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class));
+
+ final JavaRDD inputRdd = subset
+ .map(e -> prepareEventForIndexing(e, now, total), Encoders.STRING())
+ .javaRDD();
+
+ final Map esCfg = new HashMap<>();
+ // esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54");
+
+ esCfg.put("es.index.auto.create", "false");
+ esCfg.put("es.nodes", indexHost);
+ esCfg.put("es.mapping.id", "eventId"); // THE PRIMARY KEY
+ esCfg.put("es.batch.write.retry.count", "8");
+ esCfg.put("es.batch.write.retry.wait", "60s");
+ esCfg.put("es.batch.size.entries", "200");
+ esCfg.put("es.nodes.wan.only", "true");
+
+ log.info("*** Start indexing");
+ JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg);
+ log.info("*** End indexing");
+
+ log.info("*** Deleting old events");
+ final String message = deleteOldEvents(brokerApiBaseUrl, now - 1000);
+ log.info("*** Deleted events: " + message);
+
+ }
+
+ private static String deleteOldEvents(final String brokerApiBaseUrl, final long l) throws Exception {
+ final String url = brokerApiBaseUrl + "/api/events/byCreationDate/0/" + l;
+ final HttpDelete req = new HttpDelete(url);
+
+ try (final CloseableHttpClient client = HttpClients.createDefault()) {
+ try (final CloseableHttpResponse response = client.execute(req)) {
+ return IOUtils.toString(response.getEntity().getContent());
+ }
+ }
+
+ }
+
+ private static String prepareEventForIndexing(final Event e, final long creationDate, final LongAccumulator acc)
+ throws JsonProcessingException {
+ acc.add(1);
+
+ e.setCreationDate(creationDate);
+ e.setExpiryDate(Long.MAX_VALUE);
+
+ return new ObjectMapper().writeValueAsString(e);
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java
new file mode 100644
index 000000000..792a2354a
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java
@@ -0,0 +1,238 @@
+
+package eu.dnetlib.dhp.broker.oa;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.util.LongAccumulator;
+import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.broker.model.ConditionParams;
+import eu.dnetlib.dhp.broker.model.Event;
+import eu.dnetlib.dhp.broker.model.MappedFields;
+import eu.dnetlib.dhp.broker.model.Notification;
+import eu.dnetlib.dhp.broker.model.Subscription;
+import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
+import eu.dnetlib.dhp.broker.oa.util.NotificationGroup;
+import eu.dnetlib.dhp.broker.oa.util.SubscriptionUtils;
+
+public class IndexNotificationsJob {
+
+ private static final Logger log = LoggerFactory.getLogger(IndexNotificationsJob.class);
+
+ public static void main(final String[] args) throws Exception {
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ IndexNotificationsJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json")));
+ parser.parseArgument(args);
+
+ final SparkConf conf = new SparkConf();
+
+ final String eventsPath = parser.get("workingPath") + "/events";
+ log.info("eventsPath: {}", eventsPath);
+
+ final String index = parser.get("index");
+ log.info("index: {}", index);
+
+ final String indexHost = parser.get("esHost");
+ log.info("indexHost: {}", indexHost);
+
+ final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl");
+ log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl);
+
+ final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
+
+ final LongAccumulator total = spark.sparkContext().longAccumulator("total_indexed");
+
+ final long startTime = new Date().getTime();
+
+ final List subscriptions = listSubscriptions(brokerApiBaseUrl);
+
+ log.info("Number of subscriptions: " + subscriptions.size());
+
+ if (subscriptions.size() > 0) {
+ final Dataset notifications = ClusterUtils
+ .readPath(spark, eventsPath, Event.class)
+ .map(e -> generateNotifications(e, subscriptions, startTime), Encoders.bean(NotificationGroup.class))
+ .flatMap(g -> g.getData().iterator(), Encoders.bean(Notification.class));
+
+ final JavaRDD inputRdd = notifications
+ .map(n -> prepareForIndexing(n, total), Encoders.STRING())
+ .javaRDD();
+
+ final Map esCfg = new HashMap<>();
+ // esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54");
+
+ esCfg.put("es.index.auto.create", "false");
+ esCfg.put("es.nodes", indexHost);
+ esCfg.put("es.mapping.id", "notificationId"); // THE PRIMARY KEY
+ esCfg.put("es.batch.write.retry.count", "8");
+ esCfg.put("es.batch.write.retry.wait", "60s");
+ esCfg.put("es.batch.size.entries", "200");
+ esCfg.put("es.nodes.wan.only", "true");
+
+ log.info("*** Start indexing");
+ JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg);
+ log.info("*** End indexing");
+
+ log.info("*** Deleting old notifications");
+ final String message = deleteOldNotifications(brokerApiBaseUrl, startTime - 1000);
+ log.info("*** Deleted notifications: " + message);
+
+ log.info("*** sendNotifications (emails, ...)");
+ sendNotifications(brokerApiBaseUrl, startTime - 1000);
+ log.info("*** ALL done.");
+
+ }
+ }
+
+ private static NotificationGroup generateNotifications(final Event e,
+ final List subscriptions,
+ final long date) {
+ final List list = subscriptions
+ .stream()
+ .filter(
+ s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic()))
+ .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap()))
+ .map(s -> generateNotification(s, e, date))
+ .collect(Collectors.toList());
+
+ return new NotificationGroup(list);
+ }
+
+ private static Notification generateNotification(final Subscription s, final Event e, final long date) {
+ final Notification n = new Notification();
+ n.setNotificationId("ntf-" + DigestUtils.md5Hex(s.getSubscriptionId() + "@@@" + e.getEventId()));
+ n.setSubscriptionId(s.getSubscriptionId());
+ n.setEventId(e.getEventId());
+ n.setProducerId(e.getProducerId());
+ n.setTopic(e.getTopic());
+ n.setPayload(e.getPayload());
+ n.setMap(e.getMap());
+ n.setDate(date);
+ return n;
+ }
+
+ private static boolean verifyConditions(final MappedFields map,
+ final Map> conditions) {
+ if (conditions.containsKey("targetDatasourceName")
+ && !SubscriptionUtils
+ .verifyExact(map.getTargetDatasourceName(), conditions.get("targetDatasourceName").get(0).getValue())) {
+ return false;
+ }
+
+ if (conditions.containsKey("trust")
+ && !SubscriptionUtils
+ .verifyFloatRange(
+ map.getTrust(), conditions.get("trust").get(0).getValue(),
+ conditions.get("trust").get(0).getOtherValue())) {
+ return false;
+ }
+
+ if (conditions.containsKey("targetDateofacceptance") && !conditions
+ .get("targetDateofacceptance")
+ .stream()
+ .anyMatch(
+ c -> SubscriptionUtils
+ .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) {
+ return false;
+ }
+
+ if (conditions.containsKey("targetResultTitle")
+ && !conditions
+ .get("targetResultTitle")
+ .stream()
+ .anyMatch(c -> SubscriptionUtils.verifySimilar(map.getTargetResultTitle(), c.getValue()))) {
+ return false;
+ }
+
+ if (conditions.containsKey("targetAuthors")
+ && !conditions
+ .get("targetAuthors")
+ .stream()
+ .allMatch(c -> SubscriptionUtils.verifyListSimilar(map.getTargetAuthors(), c.getValue()))) {
+ return false;
+ }
+
+ if (conditions.containsKey("targetSubjects")
+ && !conditions
+ .get("targetSubjects")
+ .stream()
+ .allMatch(c -> SubscriptionUtils.verifyListExact(map.getTargetSubjects(), c.getValue()))) {
+ return false;
+ }
+
+ return true;
+
+ }
+
+ private static List listSubscriptions(final String brokerApiBaseUrl) throws Exception {
+ final String url = brokerApiBaseUrl + "/api/subscriptions";
+ final HttpGet req = new HttpGet(url);
+
+ final ObjectMapper mapper = new ObjectMapper();
+
+ try (final CloseableHttpClient client = HttpClients.createDefault()) {
+ try (final CloseableHttpResponse response = client.execute(req)) {
+ final String s = IOUtils.toString(response.getEntity().getContent());
+ return mapper
+ .readValue(s, mapper.getTypeFactory().constructCollectionType(List.class, Subscription.class));
+ }
+ }
+ }
+
+ private static String deleteOldNotifications(final String brokerApiBaseUrl, final long l) throws Exception {
+ final String url = brokerApiBaseUrl + "/api/notifications/byDate/0/" + l;
+ final HttpDelete req = new HttpDelete(url);
+
+ try (final CloseableHttpClient client = HttpClients.createDefault()) {
+ try (final CloseableHttpResponse response = client.execute(req)) {
+ return IOUtils.toString(response.getEntity().getContent());
+ }
+ }
+ }
+
+ private static String sendNotifications(final String brokerApiBaseUrl, final long l) throws IOException {
+ final String url = brokerApiBaseUrl + "/api/openaireBroker/notifications/send/" + l;
+ final HttpGet req = new HttpGet(url);
+
+ try (final CloseableHttpClient client = HttpClients.createDefault()) {
+ try (final CloseableHttpResponse response = client.execute(req)) {
+ return IOUtils.toString(response.getEntity().getContent());
+ }
+ }
+ }
+
+ private static String prepareForIndexing(final Notification n, final LongAccumulator acc)
+ throws JsonProcessingException {
+ acc.add(1);
+ return new ObjectMapper().writeValueAsString(n);
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java
index 0dc34cc42..762bfbb90 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java
@@ -20,6 +20,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
+@Deprecated
public class IndexOnESJob {
private static final Logger log = LoggerFactory.getLogger(IndexOnESJob.class);
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java
new file mode 100644
index 000000000..0748624f7
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java
@@ -0,0 +1,113 @@
+
+package eu.dnetlib.dhp.broker.oa;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+
+import eu.dnetlib.broker.api.ShortEventMessage;
+import eu.dnetlib.broker.objects.OaBrokerEventPayload;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.broker.model.Event;
+import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
+import scala.Tuple2;
+
+public class PartitionEventsByDsIdJob {
+
+ private static final Logger log = LoggerFactory.getLogger(PartitionEventsByDsIdJob.class);
+ private static final String OPENDOAR_NSPREFIX = "opendoar____::";
+
+ public static void main(final String[] args) throws Exception {
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ PartitionEventsByDsIdJob.class
+ .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
+ parser.parseArgument(args);
+
+ final Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final SparkConf conf = new SparkConf();
+
+ final String eventsPath = parser.get("workingPath") + "/events";
+ log.info("eventsPath: {}", eventsPath);
+
+ final String partitionPath = parser.get("workingPath") + "/eventsByOpendoarId";
+ log.info("partitionPath: {}", partitionPath);
+
+ runWithSparkSession(conf, isSparkSessionManaged, spark -> {
+
+ ClusterUtils
+ .readPath(spark, eventsPath, Event.class)
+ .filter(e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId()))
+ .filter(e -> e.getMap().getTargetDatasourceId().contains(OPENDOAR_NSPREFIX))
+ .map(
+ e -> new Tuple2<>(
+ StringUtils.substringAfter(e.getMap().getTargetDatasourceId(), OPENDOAR_NSPREFIX),
+ messageFromNotification(e)),
+ Encoders.tuple(Encoders.STRING(), Encoders.bean(ShortEventMessage.class)))
+ .write()
+ .partitionBy("_1")
+ .mode(SaveMode.Overwrite)
+ .json(partitionPath);
+
+ });
+ renameSubDirs(partitionPath);
+
+ }
+
+ private static void renameSubDirs(final String path) throws IOException {
+ final String prefix = "_1=";
+ final FileSystem fs = FileSystem.get(new Configuration());
+
+ log.info("** Renaming subdirs of " + path);
+ for (final FileStatus fileStatus : fs.listStatus(new Path(path))) {
+ if (fileStatus.isDirectory()) {
+ final Path oldPath = fileStatus.getPath();
+ final String oldName = oldPath.getName();
+ if (oldName.startsWith(prefix)) {
+ final Path newPath = new Path(path + "/" + StringUtils.substringAfter(oldName, prefix));
+ log.info(" * " + oldPath.getName() + " -> " + newPath.getName());
+ fs.rename(oldPath, newPath);
+ }
+ }
+ }
+ }
+
+ private static ShortEventMessage messageFromNotification(final Event e) {
+ final Gson gson = new Gson();
+
+ final OaBrokerEventPayload payload = gson.fromJson(e.getPayload(), OaBrokerEventPayload.class);
+
+ final ShortEventMessage res = new ShortEventMessage();
+
+ res.setOriginalId(payload.getResult().getOriginalId());
+ res.setTitle(payload.getResult().getTitles().stream().filter(StringUtils::isNotBlank).findFirst().orElse(null));
+ res.setTopic(e.getTopic());
+ res.setTrust(payload.getTrust());
+ res.generateMessageFromObject(payload.getHighlight());
+
+ return res;
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/NotificationGroup.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/NotificationGroup.java
new file mode 100644
index 000000000..80cf7609b
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/NotificationGroup.java
@@ -0,0 +1,44 @@
+
+package eu.dnetlib.dhp.broker.oa.util;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import eu.dnetlib.dhp.broker.model.Notification;
+
+public class NotificationGroup implements Serializable {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 720996471281158977L;
+
+ private List data = new ArrayList<>();
+
+ public NotificationGroup() {
+ }
+
+ public NotificationGroup(final List data) {
+ this.data = data;
+ }
+
+ public List getData() {
+ return data;
+ }
+
+ public void setData(final List data) {
+ this.data = data;
+ }
+
+ public NotificationGroup addElement(final Notification elem) {
+ data.add(elem);
+ return this;
+ }
+
+ public NotificationGroup addGroup(final NotificationGroup group) {
+ data.addAll(group.getData());
+ return this;
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtils.java
new file mode 100644
index 000000000..adb1c753b
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtils.java
@@ -0,0 +1,49 @@
+
+package eu.dnetlib.dhp.broker.oa.util;
+
+import java.text.ParseException;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.commons.lang3.time.DateUtils;
+
+public class SubscriptionUtils {
+
+ private static final long ONE_DAY = 86_400_000;
+
+ public static boolean verifyListSimilar(final List list, final String value) {
+ return list.stream().anyMatch(s -> verifySimilar(s, value));
+ }
+
+ public static boolean verifyListExact(final List list, final String value) {
+ return list.stream().anyMatch(s -> verifyExact(s, value));
+ }
+
+ public static boolean verifySimilar(final String s1, final String s2) {
+ for (final String part : s2.split("\\W+")) {
+ if (!StringUtils.containsIgnoreCase(s1, part)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static boolean verifyFloatRange(final float trust, final String min, final String max) {
+ return trust >= NumberUtils.toFloat(min, 0) && trust <= NumberUtils.toFloat(max, 1);
+ }
+
+ public static boolean verifyDateRange(final long date, final String min, final String max) {
+ try {
+ return date >= DateUtils.parseDate(min, "yyyy-MM-dd").getTime()
+ && date < DateUtils.parseDate(max, "yyyy-MM-dd").getTime() + ONE_DAY;
+ } catch (final ParseException e) {
+ return false;
+ }
+ }
+
+ public static boolean verifyExact(final String s1, final String s2) {
+ return StringUtils.equalsIgnoreCase(s1, s2);
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java
index 8b628809d..979bac2da 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java
@@ -2,8 +2,6 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.stats;
import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
public class DatasourceStats implements Serializable {
@@ -15,7 +13,8 @@ public class DatasourceStats implements Serializable {
private String id;
private String name;
private String type;
- private Map topics = new HashMap<>();
+ private String topic;
+ private long size = 0l;
public String getId() {
return id;
@@ -41,21 +40,24 @@ public class DatasourceStats implements Serializable {
this.type = type;
}
- public Map getTopics() {
- return topics;
+ public String getTopic() {
+ return topic;
}
- public void setTopics(final Map topics) {
- this.topics = topics;
+ public void setTopic(final String topic) {
+ this.topic = topic;
}
- public void incrementTopic(final String topic, final long inc) {
- if (topics.containsKey(topic)) {
- topics.put(topic, topics.get(topic) + inc);
- } else {
- topics.put(topic, inc);
- }
+ public long getSize() {
+ return size;
+ }
+ public void setSize(final long size) {
+ this.size = size;
+ }
+
+ public void incrementSize(final long inc) {
+ this.size = this.size + inc;
}
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java
index 5aa6698e3..240e2d211 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java
@@ -25,7 +25,8 @@ public class StatsAggregator extends Aggregator stats0.incrementTopic(e.getKey(), e.getValue()));
+ stats0.incrementSize(stats1.getSize());
return stats0;
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/subset/EventSubsetAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/subset/EventSubsetAggregator.java
new file mode 100644
index 000000000..cd403ade0
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/subset/EventSubsetAggregator.java
@@ -0,0 +1,67 @@
+
+package eu.dnetlib.dhp.broker.oa.util.aggregators.subset;
+
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.expressions.Aggregator;
+
+import eu.dnetlib.dhp.broker.model.Event;
+import eu.dnetlib.dhp.broker.oa.util.EventGroup;
+
+public class EventSubsetAggregator extends Aggregator {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -678071078823059805L;
+
+ private final int maxEventsForTopic;
+
+ public EventSubsetAggregator(final int maxEventsForTopic) {
+ this.maxEventsForTopic = maxEventsForTopic;
+ }
+
+ @Override
+ public EventGroup zero() {
+ return new EventGroup();
+ }
+
+ @Override
+ public EventGroup reduce(final EventGroup g, final Event e) {
+ if (g.getData().size() < maxEventsForTopic) {
+ g.getData().add(e);
+ }
+ return g;
+ }
+
+ @Override
+ public EventGroup merge(final EventGroup g0, final EventGroup g1) {
+ final int missing = maxEventsForTopic - g0.getData().size();
+
+ if (missing > 0) {
+ if (g1.getData().size() < missing) {
+ g0.getData().addAll(g1.getData());
+ } else {
+ g0.getData().addAll(g1.getData().subList(0, missing));
+ }
+ }
+
+ return g0;
+ }
+
+ @Override
+ public EventGroup finish(final EventGroup g) {
+ return g;
+ }
+
+ @Override
+ public Encoder outputEncoder() {
+ return Encoders.bean(EventGroup.class);
+ }
+
+ @Override
+ public Encoder bufferEncoder() {
+ return Encoders.bean(EventGroup.class);
+ }
+
+}
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml
index 568d5dc5a..4184b71bd 100644
--- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml
@@ -25,13 +25,25 @@
a black list (comma separeted, - for empty list) of datasource ids
- esIndexName
- the elasticsearch index name
+ esEventIndexName
+ the elasticsearch index name for events
+
+
+ esNotificationsIndexName
+ the elasticsearch index name for notifications
esIndexHost
the elasticsearch host
+
+ maxIndexedEventsForDsAndTopic
+ the max number of events for each couple (ds/topic)
+
+
+ brokerApiBaseUrl
+ the url of the broker service api
+
sparkDriverMemory
memory for driver process
@@ -423,16 +435,16 @@
--datasourceTypeWhitelist${datasourceTypeWhitelist}
--datasourceIdBlacklist${datasourceIdBlacklist}
-
+
-
+
yarn
cluster
- IndexOnESJob
- eu.dnetlib.dhp.broker.oa.IndexOnESJob
+ IndexEventSubsetOnESJob
+ eu.dnetlib.dhp.broker.oa.IndexEventSubsetJob
dhp-broker-events-${projectVersion}.jar
--executor-memory=${sparkExecutorMemory}
@@ -445,8 +457,36 @@
--conf spark.sql.shuffle.partitions=3840
--workingPath${workingPath}
- --index${esIndexName}
+ --index${esEventIndexName}
--esHost${esIndexHost}
+ --maxEventsForTopic${maxIndexedEventsForDsAndTopic}
+ --brokerApiBaseUrl${brokerApiBaseUrl}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ IndexNotificationsOnESJob
+ eu.dnetlib.dhp.broker.oa.IndexNotificationsJob
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.dynamicAllocation.maxExecutors="8"
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --workingPath${workingPath}
+ --index${esNotificationsIndexName}
+ --esHost${esIndexHost}
+ --brokerApiBaseUrl${brokerApiBaseUrl}
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json
new file mode 100644
index 000000000..4921bc03e
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json
@@ -0,0 +1,32 @@
+[
+ {
+ "paramName": "o",
+ "paramLongName": "workingPath",
+ "paramDescription": "the workinh path",
+ "paramRequired": true
+ },
+ {
+ "paramName": "idx",
+ "paramLongName": "index",
+ "paramDescription": "the ES index",
+ "paramRequired": true
+ },
+ {
+ "paramName": "es",
+ "paramLongName": "esHost",
+ "paramDescription": "the ES host",
+ "paramRequired": true
+ },
+ {
+ "paramName": "n",
+ "paramLongName": "maxEventsForTopic",
+ "paramDescription": "the max number of events for each couple (ds/topic)",
+ "paramRequired": true
+ },
+ {
+ "paramName": "broker",
+ "paramLongName": "brokerApiBaseUrl",
+ "paramDescription": "the url of the broker service api",
+ "paramRequired": true
+ }
+]
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json
new file mode 100644
index 000000000..5eea894c8
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json
@@ -0,0 +1,26 @@
+[
+ {
+ "paramName": "o",
+ "paramLongName": "workingPath",
+ "paramDescription": "the workinh path",
+ "paramRequired": true
+ },
+ {
+ "paramName": "idx",
+ "paramLongName": "index",
+ "paramDescription": "the ES index",
+ "paramRequired": true
+ },
+ {
+ "paramName": "es",
+ "paramLongName": "esHost",
+ "paramDescription": "the ES host",
+ "paramRequired": true
+ },
+ {
+ "paramName": "broker",
+ "paramLongName": "brokerApiBaseUrl",
+ "paramDescription": "the url of the broker service api",
+ "paramRequired": true
+ }
+]
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/config-default.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/config-default.xml
new file mode 100644
index 000000000..2e0ed9aee
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/config-default.xml
@@ -0,0 +1,18 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml
new file mode 100644
index 000000000..f629c2101
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml
@@ -0,0 +1,137 @@
+
+
+
+
+ graphInputPath
+ the path where the graph is stored
+
+
+ workingPath
+ the path where the the generated data will be stored
+
+
+ datasourceIdWhitelist
+ -
+ a white list (comma separeted, - for empty list) of datasource ids
+
+
+ datasourceTypeWhitelist
+ -
+ a white list (comma separeted, - for empty list) of datasource types
+
+
+ datasourceIdBlacklist
+ -
+ a black list (comma separeted, - for empty list) of datasource ids
+
+
+ esEventIndexName
+ the elasticsearch index name for events
+
+
+ esNotificationsIndexName
+ the elasticsearch index name for notifications
+
+
+ esIndexHost
+ the elasticsearch host
+
+
+ maxIndexedEventsForDsAndTopic
+ the max number of events for each couple (ds/topic)
+
+
+ brokerApiBaseUrl
+ the url of the broker service api
+
+
+ sparkDriverMemory
+ memory for driver process
+
+
+ sparkExecutorMemory
+ memory for individual executor
+
+
+ sparkExecutorCores
+ number of cores used by single executor
+
+
+ oozieActionShareLibForSpark2
+ oozie action sharelib for spark 2.*
+
+
+ spark2ExtraListeners
+ com.cloudera.spark.lineage.NavigatorAppListener
+ spark 2.* extra listeners classname
+
+
+ spark2SqlQueryExecutionListeners
+ com.cloudera.spark.lineage.NavigatorQueryListener
+ spark 2.* sql query execution listeners classname
+
+
+ spark2YarnHistoryServerAddress
+ spark 2.* yarn history server address
+
+
+ spark2EventLogDir
+ spark 2.* event log dir location
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ mapreduce.job.queuename
+ ${queueName}
+
+
+ oozie.launcher.mapred.job.queue.name
+ ${oozieLauncherQueueName}
+
+
+ oozie.action.sharelib.for.spark
+ ${oozieActionShareLibForSpark2}
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ yarn
+ cluster
+ IndexNotificationsOnESJob
+ eu.dnetlib.dhp.broker.oa.IndexNotificationsJob
+ dhp-broker-events-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.dynamicAllocation.maxExecutors="8"
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --workingPath${workingPath}
+ --index${esNotificationsIndexName}
+ --esHost${esIndexHost}
+ --brokerApiBaseUrl${brokerApiBaseUrl}
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml
index 3c43c0b1b..407b9f42f 100644
--- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml
@@ -8,6 +8,53 @@
workingPath
the path where the the generated data will be stored
+
+
+ datasourceIdWhitelist
+ -
+ a white list (comma separeted, - for empty list) of datasource ids
+
+
+ datasourceTypeWhitelist
+ -
+ a white list (comma separeted, - for empty list) of datasource types
+
+
+ datasourceIdBlacklist
+ -
+ a black list (comma separeted, - for empty list) of datasource ids
+
+
+ esEventIndexName
+ the elasticsearch index name for events
+
+
+ esNotificationsIndexName
+ the elasticsearch index name for notifications
+
+
+ esIndexHost
+ the elasticsearch host
+
+
+ maxIndexedEventsForDsAndTopic
+ the max number of events for each couple (ds/topic)
+
+
+ brokerApiBaseUrl
+ the url of the broker service api
+
+
+ brokerDbUrl
+ the url of the broker database
+
+
+ brokerDbUser
+ the user of the broker database
+
+
+ brokerDbPassword
+ the password of the broker database
sparkDriverMemory
@@ -64,23 +111,23 @@
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
+
yarn
cluster
- IndexOnESJob
- eu.dnetlib.dhp.broker.oa.IndexOnESJob
+ GenerateStatsJob
+ eu.dnetlib.dhp.broker.oa.GenerateStatsJob
dhp-broker-events-${projectVersion}.jar
+ --executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
- --conf spark.dynamicAllocation.maxExecutors="8"
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@@ -88,8 +135,10 @@
--conf spark.sql.shuffle.partitions=3840
--workingPath${workingPath}
- --index${esIndexName}
- --esHost${esIndexHost}
+ --dbUrl${brokerDbUrl}
+ --dbUser${brokerDbUser}
+ --dbPassword${brokerDbPassword}
+ --brokerApiBaseUrl${brokerApiBaseUrl}
diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json
new file mode 100644
index 000000000..15d7d251f
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json
@@ -0,0 +1,32 @@
+[
+ {
+ "paramName": "wp",
+ "paramLongName": "workingPath",
+ "paramDescription": "the working path",
+ "paramRequired": true
+ },
+ {
+ "paramName": "dburl",
+ "paramLongName": "dbUrl",
+ "paramDescription": "the broker database url",
+ "paramRequired": true
+ },
+ {
+ "paramName": "u",
+ "paramLongName": "dbUser",
+ "paramDescription": "the broker database user",
+ "paramRequired": true
+ },
+ {
+ "paramName": "p",
+ "paramLongName": "dbPassword",
+ "paramDescription": "the broker database password",
+ "paramRequired": true
+ },
+ {
+ "paramName": "broker",
+ "paramLongName": "brokerApiBaseUrl",
+ "paramDescription": "the url of the broker service api",
+ "paramRequired": true
+ }
+]
diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtilsTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtilsTest.java
new file mode 100644
index 000000000..b532aa9f7
--- /dev/null
+++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtilsTest.java
@@ -0,0 +1,52 @@
+
+package eu.dnetlib.dhp.broker.oa.util;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Arrays;
+
+import org.junit.jupiter.api.Test;
+
+class SubscriptionUtilsTest {
+
+ @Test
+ void testVerifyListSimilar() {
+ assertTrue(SubscriptionUtils.verifyListSimilar(Arrays.asList("Michele Artini", "Claudio Atzori"), "artini"));
+ assertFalse(SubscriptionUtils.verifyListSimilar(Arrays.asList("Michele Artini", "Claudio Atzori"), "bardi"));
+ }
+
+ @Test
+ void testVerifyListExact() {
+ assertTrue(SubscriptionUtils.verifyListExact(Arrays.asList("Java", "Perl"), "perl"));
+ assertFalse(SubscriptionUtils.verifyListExact(Arrays.asList("Java", "Perl"), "C"));
+ }
+
+ @Test
+ void testVerifySimilar() {
+ assertTrue(SubscriptionUtils.verifySimilar("Java Programming", "java"));
+ assertFalse(SubscriptionUtils.verifySimilar("Java Programming", "soap"));
+ }
+
+ @Test
+ void testVerifyFloatRange() {
+ assertTrue(SubscriptionUtils.verifyFloatRange(0.5f, "0.4", "0.6"));
+ assertFalse(SubscriptionUtils.verifyFloatRange(0.8f, "0.4", "0.6"));
+ assertTrue(SubscriptionUtils.verifyFloatRange(0.5f, "", ""));
+ }
+
+ @Test
+ void testVerifyDateRange() {
+ final long date = 1282738478000l; // 25 August 2010
+
+ assertTrue(SubscriptionUtils.verifyDateRange(date, "2010-01-01", "2011-01-01"));
+ assertFalse(SubscriptionUtils.verifyDateRange(date, "2020-01-01", "2021-01-01"));
+ }
+
+ @Test
+ void testVerifyExact() {
+ assertTrue(SubscriptionUtils.verifyExact("Java Programming", "java programming"));
+ assertFalse(SubscriptionUtils.verifyExact("Java Programming", "soap programming"));
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java
index 9bb6653d0..f5ffb6e0f 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java
@@ -373,6 +373,7 @@ public class ResultMapper implements Serializable {
private static Instance getGraphInstance(eu.dnetlib.dhp.schema.oaf.Instance i) {
Instance instance = new Instance();
+
setCommonValue(i, instance);
return instance;
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java
index 87c935d83..1e7b56ee9 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java
@@ -577,7 +577,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
final String lissn = StringUtils.isNotBlank(arr[2]) ? arr[2].trim() : null;
if (issn != null || eissn != null || lissn != null) {
- return journal(name, issn, eissn, eissn, null, null, null, null, null, null, null, info);
+ return journal(name, issn, eissn, lissn, null, null, null, null, null, null, null, info);
}
}
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql
index 43b0f8f4b..7ca672835 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql
@@ -3,7 +3,7 @@ SELECT
d.id || array_agg(distinct di.pid) AS identities,
d.officialname AS officialname,
d.englishname AS englishname,
- d.contactemail AS contactemail,
+ d.contactemail AS contactemail,
CASE
WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY ['openaire-cris_1.1'])
THEN
@@ -85,7 +85,7 @@ SELECT
dc.officialname AS collectedfromname,
d.typology||'@@@dnet:datasource_typologies' AS datasourcetype,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
- d.issn || ' @@@ ' || d.eissn || ' @@@ ' || d.lissn AS journal
+ concat_ws(' @@@ ', d.issn, d.eissn, d.lissn) AS journal
FROM dsm_datasources d
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java
index 22fcb36c9..011cc18e6 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java
@@ -73,12 +73,16 @@ public class MigrateDbEntitiesApplicationTest {
final Datasource ds = (Datasource) list.get(0);
assertValidId(ds.getId());
assertValidId(ds.getCollectedfrom().get(0).getKey());
- assertEquals(ds.getOfficialname().getValue(), getValueAsString("officialname", fields));
- assertEquals(ds.getEnglishname().getValue(), getValueAsString("englishname", fields));
- assertEquals(ds.getContactemail().getValue(), getValueAsString("contactemail", fields));
- assertEquals(ds.getWebsiteurl().getValue(), getValueAsString("websiteurl", fields));
- assertEquals(ds.getNamespaceprefix().getValue(), getValueAsString("namespaceprefix", fields));
- assertEquals(ds.getCollectedfrom().get(0).getValue(), getValueAsString("collectedfromname", fields));
+ assertEquals(getValueAsString("officialname", fields), ds.getOfficialname().getValue());
+ assertEquals(getValueAsString("englishname", fields), ds.getEnglishname().getValue());
+ assertEquals(getValueAsString("contactemail", fields), ds.getContactemail().getValue());
+ assertEquals(getValueAsString("websiteurl", fields), ds.getWebsiteurl().getValue());
+ assertEquals(getValueAsString("namespaceprefix", fields), ds.getNamespaceprefix().getValue());
+ assertEquals(getValueAsString("collectedfromname", fields), ds.getCollectedfrom().get(0).getValue());
+ assertEquals(getValueAsString("officialname", fields), ds.getJournal().getName());
+ assertEquals("2579-5449", ds.getJournal().getIssnPrinted());
+ assertEquals("2597-6540", ds.getJournal().getIssnOnline());
+ assertEquals(null, ds.getJournal().getIssnLinking());
}
@Test
@@ -92,9 +96,11 @@ public class MigrateDbEntitiesApplicationTest {
final Project p = (Project) list.get(0);
assertValidId(p.getId());
assertValidId(p.getCollectedfrom().get(0).getKey());
- assertEquals(p.getAcronym().getValue(), getValueAsString("acronym", fields));
- assertEquals(p.getTitle().getValue(), getValueAsString("title", fields));
- assertEquals(p.getCollectedfrom().get(0).getValue(), getValueAsString("collectedfromname", fields));
+ assertEquals(getValueAsString("acronym", fields), p.getAcronym().getValue());
+ assertEquals(getValueAsString("title", fields), p.getTitle().getValue());
+ assertEquals(getValueAsString("collectedfromname", fields), p.getCollectedfrom().get(0).getValue());
+ assertEquals(getValueAsFloat("fundedamount", fields), p.getFundedamount());
+ assertEquals(getValueAsFloat("totalcost", fields), p.getTotalcost());
}
@Test
@@ -110,14 +116,14 @@ public class MigrateDbEntitiesApplicationTest {
final Organization o = (Organization) list.get(0);
assertValidId(o.getId());
assertValidId(o.getCollectedfrom().get(0).getKey());
- assertEquals(o.getLegalshortname().getValue(), getValueAsString("legalshortname", fields));
- assertEquals(o.getLegalname().getValue(), getValueAsString("legalname", fields));
- assertEquals(o.getWebsiteurl().getValue(), getValueAsString("websiteurl", fields));
- assertEquals(o.getCountry().getClassid(), getValueAsString("country", fields).split("@@@")[0]);
- assertEquals(o.getCountry().getClassname(), getValueAsString("country", fields).split("@@@")[0]);
- assertEquals(o.getCountry().getSchemeid(), getValueAsString("country", fields).split("@@@")[1]);
- assertEquals(o.getCountry().getSchemename(), getValueAsString("country", fields).split("@@@")[1]);
- assertEquals(o.getCollectedfrom().get(0).getValue(), getValueAsString("collectedfromname", fields));
+ assertEquals(getValueAsString("legalshortname", fields), o.getLegalshortname().getValue());
+ assertEquals(getValueAsString("legalname", fields), o.getLegalname().getValue());
+ assertEquals(getValueAsString("websiteurl", fields), o.getWebsiteurl().getValue());
+ assertEquals(getValueAsString("country", fields).split("@@@")[0], o.getCountry().getClassid());
+ assertEquals(getValueAsString("country", fields).split("@@@")[0], o.getCountry().getClassname());
+ assertEquals(getValueAsString("country", fields).split("@@@")[1], o.getCountry().getSchemeid());
+ assertEquals(getValueAsString("country", fields).split("@@@")[1], o.getCountry().getSchemename());
+ assertEquals(getValueAsString("collectedfromname", fields), o.getCollectedfrom().get(0).getValue());
}
@Test
@@ -322,12 +328,20 @@ public class MigrateDbEntitiesApplicationTest {
}
private String getValueAsString(final String name, final List fields) {
+ return getValueAs(name, fields);
+ }
+
+ private Float getValueAsFloat(final String name, final List fields) {
+ return new Float(getValueAs(name, fields).toString());
+ }
+
+ private T getValueAs(final String name, final List fields) {
return fields
.stream()
.filter(f -> f.getField().equals(name))
.map(TypedField::getValue)
.filter(Objects::nonNull)
- .map(o -> o.toString())
+ .map(o -> (T) o)
.findFirst()
.get();
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_openapc.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_openapc.xml
new file mode 100644
index 000000000..e69de29bb
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json
index d6109cac1..a25215ca3 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json
+++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json
@@ -142,12 +142,12 @@
{
"field": "totalcost",
"type": "double",
- "value": null
+ "value": 157846
},
{
"field": "fundedamount",
"type": "double",
- "value": null
+ "value": 157846
},
{
"field": "collectedfromid",
diff --git a/pom.xml b/pom.xml
index e88e1d51b..52edd497f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -663,7 +663,7 @@
3.3.3
3.4.2
[2.12,3.0)
- 3.1.0
+ 3.1.1
7.5.0
4.7.2
1.1