From 6e60bf026aff65fdc7e6cb87cd1d066574128b0a Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Wed, 19 Aug 2020 12:39:22 +0200 Subject: [PATCH 01/18] indexing only a subset of eventsa --- .../dhp/broker/model/EventFactory.java | 17 +-- .../dhp/broker/oa/GenerateStatsJob.java | 2 +- .../dhp/broker/oa/IndexEventSubsetJob.java | 100 ++++++++++++++++++ .../dnetlib/dhp/broker/oa/IndexOnESJob.java | 1 + .../subset/EventSubsetAggregator.java | 67 ++++++++++++ .../oa/generate_all/oozie_app/workflow.xml | 17 +-- .../dhp/broker/oa/index_event_subset.json | 26 +++++ .../broker/oa/partial/oozie_app/workflow.xml | 38 ++++++- 8 files changed, 244 insertions(+), 24 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/subset/EventSubsetAggregator.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java index 0cb0d78018..429eb7d11b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java @@ -2,7 +2,6 @@ package eu.dnetlib.dhp.broker.model; import java.text.ParseException; -import java.util.Date; import java.util.List; import java.util.stream.Collectors; @@ -19,16 +18,12 @@ public class EventFactory { private final static String PRODUCER_ID = "OpenAIRE"; - private static final int TTH_DAYS = 365; - private final static String[] DATE_PATTERNS = { "yyyy-MM-dd" }; public static Event newBrokerEvent(final UpdateInfo updateInfo) { - final long now = new Date().getTime(); - final Event res = new Event(); final MappedFields map = createMapFromResult(updateInfo); @@ -44,8 +39,8 @@ public class EventFactory { res.setPayload(updateInfo.asBrokerPayload().toJSON()); res.setMap(map); res.setTopic(updateInfo.getTopicPath()); - res.setCreationDate(now); - res.setExpiryDate(calculateExpiryDate(now)); + res.setCreationDate(0l); + res.setExpiryDate(Long.MAX_VALUE); res.setInstantMessage(false); return res; @@ -96,7 +91,9 @@ public class EventFactory { return map; } - private static String calculateEventId(final String topic, final String dsId, final String publicationId, + private static String calculateEventId(final String topic, + final String dsId, + final String publicationId, final String value) { return "event-" + DigestUtils.md5Hex(topic).substring(0, 4) + "-" @@ -105,10 +102,6 @@ public class EventFactory { + DigestUtils.md5Hex(value).substring(0, 5); } - private static long calculateExpiryDate(final long now) { - return now + TTH_DAYS * 24 * 60 * 60 * 1000; - } - private static long parseDateTolong(final String date) { if (StringUtils.isBlank(date)) { return -1; diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java index a51601cd76..8a7229b647 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java @@ -28,7 +28,7 @@ public class GenerateStatsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - IndexOnESJob.class + GenerateStatsJob.class .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); parser.parseArgument(args); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java new file mode 100644 index 0000000000..17451d067d --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java @@ -0,0 +1,100 @@ + +package eu.dnetlib.dhp.broker.oa; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.TypedColumn; +import org.apache.spark.util.LongAccumulator; +import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.EventGroup; +import eu.dnetlib.dhp.broker.oa.util.aggregators.subset.EventSubsetAggregator; + +public class IndexEventSubsetJob { + + private static final Logger log = LoggerFactory.getLogger(IndexEventSubsetJob.class); + + public static void main(final String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + IndexEventSubsetJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_event_subset.json"))); + parser.parseArgument(args); + + final SparkConf conf = new SparkConf(); + + final String eventsPath = parser.get("workingPath") + "/events"; + log.info("eventsPath: {}", eventsPath); + + final String index = parser.get("index"); + log.info("index: {}", index); + + final String indexHost = parser.get("esHost"); + log.info("indexHost: {}", indexHost); + + final int maxEventsForTopic = NumberUtils.toInt(parser.get("maxEventsForTopic")); + log.info("maxEventsForTopic: {}", maxEventsForTopic); + + final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); + + final TypedColumn aggr = new EventSubsetAggregator(maxEventsForTopic).toColumn(); + + final LongAccumulator total = spark.sparkContext().longAccumulator("total_indexed"); + + final long now = new Date().getTime(); + + final Dataset subset = ClusterUtils + .readPath(spark, eventsPath, Event.class) + .groupByKey(e -> e.getTopic() + '@' + e.getMap().getTargetDatasourceId(), Encoders.STRING()) + .agg(aggr) + .map(t -> t._2, Encoders.bean(EventGroup.class)) + .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class)); + + final JavaRDD inputRdd = subset + .map(e -> prepareEventForIndexing(e, now, total), Encoders.STRING()) + .javaRDD(); + + final Map esCfg = new HashMap<>(); + // esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54"); + + esCfg.put("es.index.auto.create", "false"); + esCfg.put("es.nodes", indexHost); + esCfg.put("es.mapping.id", "eventId"); // THE PRIMARY KEY + esCfg.put("es.batch.write.retry.count", "8"); + esCfg.put("es.batch.write.retry.wait", "60s"); + esCfg.put("es.batch.size.entries", "200"); + esCfg.put("es.nodes.wan.only", "true"); + + JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); + } + + private static String prepareEventForIndexing(final Event e, final long creationDate, final LongAccumulator acc) + throws JsonProcessingException { + acc.add(1); + + e.setCreationDate(creationDate); + e.setExpiryDate(Long.MAX_VALUE); + + return new ObjectMapper().writeValueAsString(e); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java index 0dc34cc42c..762bfbb903 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java @@ -20,6 +20,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.model.Event; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +@Deprecated public class IndexOnESJob { private static final Logger log = LoggerFactory.getLogger(IndexOnESJob.class); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/subset/EventSubsetAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/subset/EventSubsetAggregator.java new file mode 100644 index 0000000000..cd403ade0a --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/subset/EventSubsetAggregator.java @@ -0,0 +1,67 @@ + +package eu.dnetlib.dhp.broker.oa.util.aggregators.subset; + +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.expressions.Aggregator; + +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.oa.util.EventGroup; + +public class EventSubsetAggregator extends Aggregator { + + /** + * + */ + private static final long serialVersionUID = -678071078823059805L; + + private final int maxEventsForTopic; + + public EventSubsetAggregator(final int maxEventsForTopic) { + this.maxEventsForTopic = maxEventsForTopic; + } + + @Override + public EventGroup zero() { + return new EventGroup(); + } + + @Override + public EventGroup reduce(final EventGroup g, final Event e) { + if (g.getData().size() < maxEventsForTopic) { + g.getData().add(e); + } + return g; + } + + @Override + public EventGroup merge(final EventGroup g0, final EventGroup g1) { + final int missing = maxEventsForTopic - g0.getData().size(); + + if (missing > 0) { + if (g1.getData().size() < missing) { + g0.getData().addAll(g1.getData()); + } else { + g0.getData().addAll(g1.getData().subList(0, missing)); + } + } + + return g0; + } + + @Override + public EventGroup finish(final EventGroup g) { + return g; + } + + @Override + public Encoder outputEncoder() { + return Encoders.bean(EventGroup.class); + } + + @Override + public Encoder bufferEncoder() { + return Encoders.bean(EventGroup.class); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 568d5dc5a0..a10794f637 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -25,13 +25,17 @@ a black list (comma separeted, - for empty list) of datasource ids - esIndexName + esEventIndexName the elasticsearch index name esIndexHost the elasticsearch host + + maxIndexedEventsForDsAndTopic + the max number of events for each couple (ds/topic) + sparkDriverMemory memory for driver process @@ -423,16 +427,16 @@ --datasourceTypeWhitelist${datasourceTypeWhitelist} --datasourceIdBlacklist${datasourceIdBlacklist} - + - + yarn cluster - IndexOnESJob - eu.dnetlib.dhp.broker.oa.IndexOnESJob + IndexEventSubsetOnESJob + eu.dnetlib.dhp.broker.oa.IndexEventSubsetJob dhp-broker-events-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -445,8 +449,9 @@ --conf spark.sql.shuffle.partitions=3840 --workingPath${workingPath} - --index${esIndexName} + --index${esEventIndexName} --esHost${esIndexHost} + --maxEventsForTopic${maxIndexedEventsForDsAndTopic} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json new file mode 100644 index 0000000000..72703ae335 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "o", + "paramLongName": "workingPath", + "paramDescription": "the workinh path", + "paramRequired": true + }, + { + "paramName": "idx", + "paramLongName": "index", + "paramDescription": "the ES index", + "paramRequired": true + }, + { + "paramName": "es", + "paramLongName": "esHost", + "paramDescription": "the ES host", + "paramRequired": true + }, + { + "paramName": "n", + "paramLongName": "maxEventsForTopic", + "paramDescription": "the max number of events for each couple (ds/topic)", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index 3c43c0b1b8..306343dbe4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -8,6 +8,33 @@ workingPath the path where the the generated data will be stored + + + datasourceIdWhitelist + - + a white list (comma separeted, - for empty list) of datasource ids + + + datasourceTypeWhitelist + - + a white list (comma separeted, - for empty list) of datasource types + + + datasourceIdBlacklist + - + a black list (comma separeted, - for empty list) of datasource ids + + + esEventIndexName + the elasticsearch index name + + + esIndexHost + the elasticsearch host + + + maxIndexedEventsForDsAndTopic + the max number of events for each couple (ds/topic) sparkDriverMemory @@ -64,18 +91,18 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - IndexOnESJob - eu.dnetlib.dhp.broker.oa.IndexOnESJob + IndexEventSubsetOnESJob + eu.dnetlib.dhp.broker.oa.IndexEventSubsetJob dhp-broker-events-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -88,8 +115,9 @@ --conf spark.sql.shuffle.partitions=3840 --workingPath${workingPath} - --index${esIndexName} + --index${esEventIndexName} --esHost${esIndexHost} + --maxEventsForTopic${maxIndexedEventsForDsAndTopic} From da470422d3470338c339dd76550aab57874fb385 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Fri, 21 Aug 2020 14:52:48 +0200 Subject: [PATCH 02/18] deleting events --- dhp-workflows/dhp-broker-events/pom.xml | 4 +++ .../dhp/broker/oa/IndexEventSubsetJob.java | 26 +++++++++++++++++++ .../oa/generate_all/oozie_app/workflow.xml | 5 ++++ .../dhp/broker/oa/index_event_subset.json | 6 +++++ .../broker/oa/partial/oozie_app/workflow.xml | 5 ++++ 5 files changed, 46 insertions(+) diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml index f98708c648..75cc0ea095 100644 --- a/dhp-workflows/dhp-broker-events/pom.xml +++ b/dhp-workflows/dhp-broker-events/pom.xml @@ -31,6 +31,10 @@ elasticsearch-hadoop + + org.apache.httpcomponents + httpclient + eu.dnetlib.dhp diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java index 17451d067d..d3cbe0034f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java @@ -7,6 +7,10 @@ import java.util.Map; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.math.NumberUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; @@ -54,6 +58,9 @@ public class IndexEventSubsetJob { final int maxEventsForTopic = NumberUtils.toInt(parser.get("maxEventsForTopic")); log.info("maxEventsForTopic: {}", maxEventsForTopic); + final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl"); + log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl); + final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); final TypedColumn aggr = new EventSubsetAggregator(maxEventsForTopic).toColumn(); @@ -84,7 +91,26 @@ public class IndexEventSubsetJob { esCfg.put("es.batch.size.entries", "200"); esCfg.put("es.nodes.wan.only", "true"); + log.info("*** Start indexing"); JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); + log.info("*** End indexing"); + + log.info("*** Deleting old events"); + final String message = deleteOldEvents(brokerApiBaseUrl, now - 1000); + log.info("*** Deleted events: " + message); + + } + + private static String deleteOldEvents(final String brokerApiBaseUrl, final long l) throws Exception { + final String url = brokerApiBaseUrl + "/api/events/byCreationDate/0/" + l; + final HttpDelete req = new HttpDelete(url); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + return IOUtils.toString(response.getEntity().getContent()); + } + } + } private static String prepareEventForIndexing(final Event e, final long creationDate, final LongAccumulator acc) diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index a10794f637..2e669676bf 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -36,6 +36,10 @@ maxIndexedEventsForDsAndTopic the max number of events for each couple (ds/topic) + + brokerApiBaseUrl + the url of the broker service api + sparkDriverMemory memory for driver process @@ -452,6 +456,7 @@ --index${esEventIndexName} --esHost${esIndexHost} --maxEventsForTopic${maxIndexedEventsForDsAndTopic} + --brokerApiBaseUrl${brokerApiBaseUrl} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json index 72703ae335..4921bc03e2 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json @@ -22,5 +22,11 @@ "paramLongName": "maxEventsForTopic", "paramDescription": "the max number of events for each couple (ds/topic)", "paramRequired": true + }, + { + "paramName": "broker", + "paramLongName": "brokerApiBaseUrl", + "paramDescription": "the url of the broker service api", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index 306343dbe4..0b0557693c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -36,6 +36,10 @@ maxIndexedEventsForDsAndTopic the max number of events for each couple (ds/topic) + + brokerApiBaseUrl + the url of the broker service api + sparkDriverMemory memory for driver process @@ -118,6 +122,7 @@ --index${esEventIndexName} --esHost${esIndexHost} --maxEventsForTopic${maxIndexedEventsForDsAndTopic} + --brokerApiBaseUrl${brokerApiBaseUrl} From 82ed8edafd839bc793e0de8b368e1c48e1973811 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Wed, 26 Aug 2020 15:10:48 +0200 Subject: [PATCH 03/18] notification indexing --- .../dhp/broker/model/ConditionParams.java | 31 +++ .../dhp/broker/model/MapCondition.java | 37 ++++ .../dhp/broker/model/Notification.java | 93 ++++++++ .../dhp/broker/model/Subscription.java | 74 +++++++ .../dhp/broker/oa/IndexNotificationsJob.java | 204 ++++++++++++++++++ .../dhp/broker/oa/util/NotificationGroup.java | 44 ++++ .../dhp/broker/oa/util/SubscriptionUtils.java | 49 +++++ .../oa/generate_all/oozie_app/workflow.xml | 32 ++- .../dhp/broker/oa/index_notifications.json | 26 +++ .../broker/oa/partial/oozie_app/workflow.xml | 18 +- .../broker/oa/util/SubscriptionUtilsTest.java | 52 +++++ 11 files changed, 652 insertions(+), 8 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ConditionParams.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MapCondition.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Notification.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Subscription.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/NotificationGroup.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtils.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json create mode 100644 dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtilsTest.java diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ConditionParams.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ConditionParams.java new file mode 100644 index 0000000000..375300c056 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ConditionParams.java @@ -0,0 +1,31 @@ + +package eu.dnetlib.dhp.broker.model; + +import java.io.Serializable; + +public class ConditionParams implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 2719901844537516110L; + + private String value; + private String otherValue; + + public String getValue() { + return value; + } + + public void setValue(final String value) { + this.value = value; + } + + public String getOtherValue() { + return otherValue; + } + + public void setOtherValue(final String otherValue) { + this.otherValue = otherValue; + } +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MapCondition.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MapCondition.java new file mode 100644 index 0000000000..069eee2a82 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/MapCondition.java @@ -0,0 +1,37 @@ + +package eu.dnetlib.dhp.broker.model; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class MapCondition implements Serializable { + + /** + * + */ + private static final long serialVersionUID = -7137490975452466813L; + + private String field; + private List listParams = new ArrayList<>(); + + public String getField() { + return field; + } + + public void setField(final String field) { + this.field = field; + } + + public List getListParams() { + return listParams; + } + + public void setListParams(final List listParams) { + this.listParams = listParams; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Notification.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Notification.java new file mode 100644 index 0000000000..4ef25bf1f4 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Notification.java @@ -0,0 +1,93 @@ + +package eu.dnetlib.dhp.broker.model; + +import java.io.Serializable; + +public class Notification implements Serializable { + + /** + * + */ + private static final long serialVersionUID = -1770420972526995727L; + + private String notificationId; + + private String subscriptionId; + + private String producerId; + + private String eventId; + + private String topic; + + private Long date; + + private String payload; + + private MappedFields map; + + public String getNotificationId() { + return notificationId; + } + + public void setNotificationId(final String notificationId) { + this.notificationId = notificationId; + } + + public String getSubscriptionId() { + return subscriptionId; + } + + public void setSubscriptionId(final String subscriptionId) { + this.subscriptionId = subscriptionId; + } + + public String getProducerId() { + return producerId; + } + + public void setProducerId(final String producerId) { + this.producerId = producerId; + } + + public String getEventId() { + return eventId; + } + + public void setEventId(final String eventId) { + this.eventId = eventId; + } + + public String getTopic() { + return topic; + } + + public void setTopic(final String topic) { + this.topic = topic; + } + + public String getPayload() { + return payload; + } + + public void setPayload(final String payload) { + this.payload = payload; + } + + public MappedFields getMap() { + return map; + } + + public void setMap(final MappedFields map) { + this.map = map; + } + + public Long getDate() { + return date; + } + + public void setDate(final Long date) { + this.date = date; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Subscription.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Subscription.java new file mode 100644 index 0000000000..6cfd8b0a3c --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Subscription.java @@ -0,0 +1,74 @@ + +package eu.dnetlib.dhp.broker.model; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.ObjectMapper; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class Subscription implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 1051702214740830010L; + + private String subscriptionId; + + private String subscriber; + + private String topic; + + private String conditions; + + public String getSubscriptionId() { + return subscriptionId; + } + + public void setSubscriptionId(final String subscriptionId) { + this.subscriptionId = subscriptionId; + } + + public String getSubscriber() { + return subscriber; + } + + public void setSubscriber(final String subscriber) { + this.subscriber = subscriber; + } + + public String getTopic() { + return topic; + } + + public void setTopic(final String topic) { + this.topic = topic; + } + + public String getConditions() { + return conditions; + } + + public void setConditions(final String conditions) { + this.conditions = conditions; + } + + public Map> conditionsAsMap() { + final ObjectMapper mapper = new ObjectMapper(); + try { + final List list = mapper + .readValue( + getConditions(), mapper.getTypeFactory().constructCollectionType(List.class, MapCondition.class)); + return list + .stream() + .filter(mc -> !mc.getListParams().isEmpty()) + .collect(Collectors.toMap(MapCondition::getField, MapCondition::getListParams)); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java new file mode 100644 index 0000000000..6de00dbee5 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -0,0 +1,204 @@ + +package eu.dnetlib.dhp.broker.oa; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.LongAccumulator; +import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.model.ConditionParams; +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.model.MappedFields; +import eu.dnetlib.dhp.broker.model.Notification; +import eu.dnetlib.dhp.broker.model.Subscription; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.NotificationGroup; +import eu.dnetlib.dhp.broker.oa.util.SubscriptionUtils; + +public class IndexNotificationsJob { + + private static final Logger log = LoggerFactory.getLogger(IndexNotificationsJob.class); + + public static void main(final String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + IndexNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); + parser.parseArgument(args); + + final SparkConf conf = new SparkConf(); + + final String eventsPath = parser.get("workingPath") + "/events"; + log.info("eventsPath: {}", eventsPath); + + final String index = parser.get("index"); + log.info("index: {}", index); + + final String indexHost = parser.get("esHost"); + log.info("indexHost: {}", indexHost); + + final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl"); + log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl); + + final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); + + final LongAccumulator total = spark.sparkContext().longAccumulator("total_indexed"); + + final long now = new Date().getTime(); + + final List subscriptions = listSubscriptions(brokerApiBaseUrl); + + log.info("Number of subscriptions: " + subscriptions.size()); + + if (subscriptions.size() > 0) { + final Dataset notifications = ClusterUtils + .readPath(spark, eventsPath, Event.class) + .map(e -> generateNotifications(e, subscriptions, now), Encoders.bean(NotificationGroup.class)) + .flatMap(g -> g.getData().iterator(), Encoders.bean(Notification.class)); + + final JavaRDD inputRdd = notifications + .map(n -> prepareForIndexing(n, total), Encoders.STRING()) + .javaRDD(); + + final Map esCfg = new HashMap<>(); + // esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54"); + + esCfg.put("es.index.auto.create", "false"); + esCfg.put("es.nodes", indexHost); + esCfg.put("es.mapping.id", "notificationId"); // THE PRIMARY KEY + esCfg.put("es.batch.write.retry.count", "8"); + esCfg.put("es.batch.write.retry.wait", "60s"); + esCfg.put("es.batch.size.entries", "200"); + esCfg.put("es.nodes.wan.only", "true"); + + log.info("*** Start indexing"); + JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); + log.info("*** End indexing"); + } + } + + private static NotificationGroup generateNotifications(final Event e, + final List subscriptions, + final long date) { + final List list = subscriptions + .stream() + .filter(s -> s.getTopic().equals(e.getTopic())) + .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) + .map(s -> generateNotification(s, e, date)) + .collect(Collectors.toList()); + + return new NotificationGroup(list); + } + + private static Notification generateNotification(final Subscription s, final Event e, final long date) { + final Notification n = new Notification(); + n.setNotificationId("ntf-" + DigestUtils.md5Hex(s.getSubscriptionId() + "@@@" + e.getEventId())); + n.setSubscriptionId(s.getSubscriptionId()); + n.setEventId(e.getEventId()); + n.setProducerId(e.getProducerId()); + n.setTopic(e.getTopic()); + n.setPayload(e.getPayload()); + n.setMap(e.getMap()); + n.setDate(date); + return n; + } + + private static boolean verifyConditions(final MappedFields map, + final Map> conditions) { + if (conditions.containsKey("targetDatasourceName") + && !SubscriptionUtils + .verifyExact(map.getTargetDatasourceName(), conditions.get("targetDatasourceName").get(0).getValue())) { + return false; + } + + if (conditions.containsKey("trust") + && !SubscriptionUtils + .verifyFloatRange( + map.getTrust(), conditions.get("trust").get(0).getValue(), + conditions.get("trust").get(0).getOtherValue())) { + return false; + } + + if (conditions.containsKey("targetDateofacceptance") && !conditions + .get("targetDateofacceptance") + .stream() + .anyMatch( + c -> SubscriptionUtils + .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { + return false; + } + + if (conditions.containsKey("targetResultTitle") + && !conditions + .get("targetResultTitle") + .stream() + .anyMatch(c -> SubscriptionUtils.verifySimilar(map.getTargetResultTitle(), c.getValue()))) { + return false; + } + + if (conditions.containsKey("targetAuthors") + && !conditions + .get("targetAuthors") + .stream() + .allMatch(c -> SubscriptionUtils.verifyListSimilar(map.getTargetAuthors(), c.getValue()))) { + return false; + } + + if (conditions.containsKey("targetSubjects") + && !conditions + .get("targetSubjects") + .stream() + .allMatch(c -> SubscriptionUtils.verifyListExact(map.getTargetSubjects(), c.getValue()))) { + return false; + } + + return true; + + } + + private static List listSubscriptions(final String brokerApiBaseUrl) throws Exception { + final String url = brokerApiBaseUrl + "/api/subscriptions"; + final HttpGet req = new HttpGet(url); + + final ObjectMapper mapper = new ObjectMapper(); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + final String s = IOUtils.toString(response.getEntity().getContent()); + return mapper + .readValue(s, mapper.getTypeFactory().constructCollectionType(List.class, Subscription.class)); + } + } + + } + + private static String prepareForIndexing(final Notification n, final LongAccumulator acc) + throws JsonProcessingException { + acc.add(1); + return new ObjectMapper().writeValueAsString(n); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/NotificationGroup.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/NotificationGroup.java new file mode 100644 index 0000000000..80cf7609b9 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/NotificationGroup.java @@ -0,0 +1,44 @@ + +package eu.dnetlib.dhp.broker.oa.util; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import eu.dnetlib.dhp.broker.model.Notification; + +public class NotificationGroup implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 720996471281158977L; + + private List data = new ArrayList<>(); + + public NotificationGroup() { + } + + public NotificationGroup(final List data) { + this.data = data; + } + + public List getData() { + return data; + } + + public void setData(final List data) { + this.data = data; + } + + public NotificationGroup addElement(final Notification elem) { + data.add(elem); + return this; + } + + public NotificationGroup addGroup(final NotificationGroup group) { + data.addAll(group.getData()); + return this; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtils.java new file mode 100644 index 0000000000..adb1c753b1 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtils.java @@ -0,0 +1,49 @@ + +package eu.dnetlib.dhp.broker.oa.util; + +import java.text.ParseException; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.commons.lang3.time.DateUtils; + +public class SubscriptionUtils { + + private static final long ONE_DAY = 86_400_000; + + public static boolean verifyListSimilar(final List list, final String value) { + return list.stream().anyMatch(s -> verifySimilar(s, value)); + } + + public static boolean verifyListExact(final List list, final String value) { + return list.stream().anyMatch(s -> verifyExact(s, value)); + } + + public static boolean verifySimilar(final String s1, final String s2) { + for (final String part : s2.split("\\W+")) { + if (!StringUtils.containsIgnoreCase(s1, part)) { + return false; + } + } + return true; + } + + public static boolean verifyFloatRange(final float trust, final String min, final String max) { + return trust >= NumberUtils.toFloat(min, 0) && trust <= NumberUtils.toFloat(max, 1); + } + + public static boolean verifyDateRange(final long date, final String min, final String max) { + try { + return date >= DateUtils.parseDate(min, "yyyy-MM-dd").getTime() + && date < DateUtils.parseDate(max, "yyyy-MM-dd").getTime() + ONE_DAY; + } catch (final ParseException e) { + return false; + } + } + + public static boolean verifyExact(final String s1, final String s2) { + return StringUtils.equalsIgnoreCase(s1, s2); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 2e669676bf..4184b71bd9 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -26,7 +26,11 @@ esEventIndexName - the elasticsearch index name + the elasticsearch index name for events + + + esNotificationsIndexName + the elasticsearch index name for notifications esIndexHost @@ -458,6 +462,32 @@ --maxEventsForTopic${maxIndexedEventsForDsAndTopic} --brokerApiBaseUrl${brokerApiBaseUrl} + + + + + + + yarn + cluster + IndexNotificationsOnESJob + eu.dnetlib.dhp.broker.oa.IndexNotificationsJob + dhp-broker-events-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --workingPath${workingPath} + --index${esNotificationsIndexName} + --esHost${esIndexHost} + --brokerApiBaseUrl${brokerApiBaseUrl} + diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json new file mode 100644 index 0000000000..5eea894c8e --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "o", + "paramLongName": "workingPath", + "paramDescription": "the workinh path", + "paramRequired": true + }, + { + "paramName": "idx", + "paramLongName": "index", + "paramDescription": "the ES index", + "paramRequired": true + }, + { + "paramName": "es", + "paramLongName": "esHost", + "paramDescription": "the ES host", + "paramRequired": true + }, + { + "paramName": "broker", + "paramLongName": "brokerApiBaseUrl", + "paramDescription": "the url of the broker service api", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index 0b0557693c..f629c2101e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -26,7 +26,11 @@ esEventIndexName - the elasticsearch index name + the elasticsearch index name for events + + + esNotificationsIndexName + the elasticsearch index name for notifications esIndexHost @@ -95,18 +99,18 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - IndexEventSubsetOnESJob - eu.dnetlib.dhp.broker.oa.IndexEventSubsetJob + IndexNotificationsOnESJob + eu.dnetlib.dhp.broker.oa.IndexNotificationsJob dhp-broker-events-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -119,14 +123,14 @@ --conf spark.sql.shuffle.partitions=3840 --workingPath${workingPath} - --index${esEventIndexName} + --index${esNotificationsIndexName} --esHost${esIndexHost} - --maxEventsForTopic${maxIndexedEventsForDsAndTopic} --brokerApiBaseUrl${brokerApiBaseUrl} + diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtilsTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtilsTest.java new file mode 100644 index 0000000000..b532aa9f7e --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/SubscriptionUtilsTest.java @@ -0,0 +1,52 @@ + +package eu.dnetlib.dhp.broker.oa.util; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Arrays; + +import org.junit.jupiter.api.Test; + +class SubscriptionUtilsTest { + + @Test + void testVerifyListSimilar() { + assertTrue(SubscriptionUtils.verifyListSimilar(Arrays.asList("Michele Artini", "Claudio Atzori"), "artini")); + assertFalse(SubscriptionUtils.verifyListSimilar(Arrays.asList("Michele Artini", "Claudio Atzori"), "bardi")); + } + + @Test + void testVerifyListExact() { + assertTrue(SubscriptionUtils.verifyListExact(Arrays.asList("Java", "Perl"), "perl")); + assertFalse(SubscriptionUtils.verifyListExact(Arrays.asList("Java", "Perl"), "C")); + } + + @Test + void testVerifySimilar() { + assertTrue(SubscriptionUtils.verifySimilar("Java Programming", "java")); + assertFalse(SubscriptionUtils.verifySimilar("Java Programming", "soap")); + } + + @Test + void testVerifyFloatRange() { + assertTrue(SubscriptionUtils.verifyFloatRange(0.5f, "0.4", "0.6")); + assertFalse(SubscriptionUtils.verifyFloatRange(0.8f, "0.4", "0.6")); + assertTrue(SubscriptionUtils.verifyFloatRange(0.5f, "", "")); + } + + @Test + void testVerifyDateRange() { + final long date = 1282738478000l; // 25 August 2010 + + assertTrue(SubscriptionUtils.verifyDateRange(date, "2010-01-01", "2011-01-01")); + assertFalse(SubscriptionUtils.verifyDateRange(date, "2020-01-01", "2021-01-01")); + } + + @Test + void testVerifyExact() { + assertTrue(SubscriptionUtils.verifyExact("Java Programming", "java programming")); + assertFalse(SubscriptionUtils.verifyExact("Java Programming", "soap programming")); + } + +} From bb459caf6918bbb469132f17815d9e22451f0fce Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 27 Aug 2020 11:01:21 +0200 Subject: [PATCH 04/18] support for all topic subscriptions --- .../java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index 6de00dbee5..c64cf6a7aa 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -9,6 +9,7 @@ import java.util.stream.Collectors; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; @@ -105,7 +106,7 @@ public class IndexNotificationsJob { final long date) { final List list = subscriptions .stream() - .filter(s -> s.getTopic().equals(e.getTopic())) + .filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals(e.getTopic())) .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) .map(s -> generateNotification(s, e, date)) .collect(Collectors.toList()); From 80eba5b4977684a8846f30ddb0c64fe1bda1f218 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 7 Sep 2020 11:40:00 +0200 Subject: [PATCH 05/18] avoid to reformat javadoc comments --- .../src/main/resources/eclipse/formatter_dnet.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-build/dhp-code-style/src/main/resources/eclipse/formatter_dnet.xml b/dhp-build/dhp-code-style/src/main/resources/eclipse/formatter_dnet.xml index fcba2c4b29..e4d85bf399 100644 --- a/dhp-build/dhp-code-style/src/main/resources/eclipse/formatter_dnet.xml +++ b/dhp-build/dhp-code-style/src/main/resources/eclipse/formatter_dnet.xml @@ -19,7 +19,7 @@ - + From 8a523474b73f64497b34ec94888569afd94ba073 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 7 Sep 2020 11:40:16 +0200 Subject: [PATCH 06/18] code formatting --- .../api/MissingConceptDoiException.java | 7 +- .../dhp/common/api/ZenodoAPIClient.java | 42 +++++------ .../common/api/zenodo/ZenodoModelList.java | 1 + .../dnetlib/dhp/schema/dump/oaf/Funder.java | 43 +++++------ .../dnetlib/dhp/schema/dump/oaf/Instance.java | 4 +- .../dnetlib/dhp/schema/dump/oaf/Project.java | 57 +++++++------- .../schema/dump/oaf/graph/Organization.java | 1 - .../dhp/oa/graph/dump/ResultMapper.java | 75 +++++++++---------- .../dhp/oa/graph/dump/SendToZenodoHDFS.java | 2 + .../graph/dump/graph/DumpGraphEntities.java | 4 +- .../dump/graph/SparkOrganizationRelation.java | 1 - .../oa/graph/dump/SplitForCommunityTest.java | 3 + .../graph/ExtractRelationFromEntityTest.java | 1 - 13 files changed, 122 insertions(+), 119 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/MissingConceptDoiException.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/MissingConceptDoiException.java index f2160d4f23..b75872eb48 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/MissingConceptDoiException.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/MissingConceptDoiException.java @@ -1,7 +1,8 @@ + package eu.dnetlib.dhp.common.api; public class MissingConceptDoiException extends Throwable { - public MissingConceptDoiException(String message) { - super(message); - } + public MissingConceptDoiException(String message) { + super(message); + } } diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/ZenodoAPIClient.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/ZenodoAPIClient.java index eebb14b177..f2dd4f0ac8 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/ZenodoAPIClient.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/ZenodoAPIClient.java @@ -12,14 +12,12 @@ import okhttp3.*; public class ZenodoAPIClient implements Serializable { - String urlString; String bucket; String deposition_id; String access_token; - public static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8"); private static final MediaType MEDIA_TYPE_ZIP = MediaType.parse("application/zip"); @@ -40,7 +38,9 @@ public class ZenodoAPIClient implements Serializable { this.bucket = bucket; } - public void setDeposition_id(String deposition_id){this.deposition_id = deposition_id;} + public void setDeposition_id(String deposition_id) { + this.deposition_id = deposition_id; + } public ZenodoAPIClient(String urlString, String access_token) throws IOException { @@ -182,10 +182,10 @@ public class ZenodoAPIClient implements Serializable { OkHttpClient httpClient = new OkHttpClient(); Request request = new Request.Builder() - .url(urlString + "/" + deposition_id + "/actions/newversion") - .addHeader("Authorization", "Bearer " + access_token) - .post(RequestBody.create(MEDIA_TYPE_JSON, json)) - .build(); + .url(urlString + "/" + deposition_id + "/actions/newversion") + .addHeader("Authorization", "Bearer " + access_token) + .post(RequestBody.create(MEDIA_TYPE_JSON, json)) + .build(); try (Response response = httpClient.newCall(request).execute()) { @@ -205,9 +205,9 @@ public class ZenodoAPIClient implements Serializable { ZenodoModelList zenodoModelList = new Gson().fromJson(getPrevDepositions(), ZenodoModelList.class); - for(ZenodoModel zm : zenodoModelList){ - if (zm.getConceptrecid().equals(concept_rec_id)){ - deposition_id = zm.getId(); + for (ZenodoModel zm : zenodoModelList) { + if (zm.getConceptrecid().equals(concept_rec_id)) { + deposition_id = zm.getId(); return; } } @@ -220,11 +220,11 @@ public class ZenodoAPIClient implements Serializable { OkHttpClient httpClient = new OkHttpClient(); Request request = new Request.Builder() - .url(urlString) - .addHeader("Content-Type", "application/json") // add request headers - .addHeader("Authorization", "Bearer " + access_token) - .get() - .build(); + .url(urlString) + .addHeader("Content-Type", "application/json") // add request headers + .addHeader("Authorization", "Bearer " + access_token) + .get() + .build(); try (Response response = httpClient.newCall(request).execute()) { @@ -241,11 +241,11 @@ public class ZenodoAPIClient implements Serializable { OkHttpClient httpClient = new OkHttpClient(); Request request = new Request.Builder() - .url(url) - .addHeader("Content-Type", "application/json") // add request headers - .addHeader("Authorization", "Bearer " + access_token) - .get() - .build(); + .url(url) + .addHeader("Content-Type", "application/json") // add request headers + .addHeader("Authorization", "Bearer " + access_token) + .get() + .build(); try (Response response = httpClient.newCall(request).execute()) { @@ -255,12 +255,10 @@ public class ZenodoAPIClient implements Serializable { // Get response body ZenodoModel zenodoModel = new Gson().fromJson(response.body().string(), ZenodoModel.class); - return zenodoModel.getLinks().getBucket(); } } - } diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/zenodo/ZenodoModelList.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/zenodo/ZenodoModelList.java index f2e02d6b70..b3b1507143 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/zenodo/ZenodoModelList.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/api/zenodo/ZenodoModelList.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.common.api.zenodo; import java.util.ArrayList; diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/Funder.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/Funder.java index a49ca7aa49..16cab22cc5 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/Funder.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/Funder.java @@ -1,35 +1,36 @@ + package eu.dnetlib.dhp.schema.dump.oaf; import java.io.Serializable; public class Funder implements Serializable { - private String shortName; + private String shortName; - private String name; + private String name; - private String jurisdiction; + private String jurisdiction; - public String getJurisdiction() { - return jurisdiction; - } + public String getJurisdiction() { + return jurisdiction; + } - public void setJurisdiction(String jurisdiction) { - this.jurisdiction = jurisdiction; - } + public void setJurisdiction(String jurisdiction) { + this.jurisdiction = jurisdiction; + } - public String getShortName() { - return shortName; - } + public String getShortName() { + return shortName; + } - public void setShortName(String shortName) { - this.shortName = shortName; - } + public void setShortName(String shortName) { + this.shortName = shortName; + } - public String getName() { - return name; - } + public String getName() { + return name; + } - public void setName(String name) { - this.name = name; - } + public void setName(String name) { + this.name = name; + } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/Instance.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/Instance.java index 3db9cbe919..4a09f5a865 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/Instance.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/Instance.java @@ -30,11 +30,11 @@ public class Instance implements Serializable { private String type; - private KeyValue hostedby; + private KeyValue hostedby; private List url; - private KeyValue collectedfrom; + private KeyValue collectedfrom; private String publicationdate;// dateofacceptance; diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/Project.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/Project.java index 70a3fbfbd1..f23d5a6703 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/Project.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/Project.java @@ -1,44 +1,45 @@ + package eu.dnetlib.dhp.schema.dump.oaf; import java.io.Serializable; public class Project implements Serializable { - protected String id;// OpenAIRE id - protected String code; + protected String id;// OpenAIRE id + protected String code; - protected String acronym; + protected String acronym; - protected String title; + protected String title; - public String getId() { - return id; - } + public String getId() { + return id; + } - public void setId(String id) { - this.id = id; - } + public void setId(String id) { + this.id = id; + } - public String getCode() { - return code; - } + public String getCode() { + return code; + } - public void setCode(String code) { - this.code = code; - } + public void setCode(String code) { + this.code = code; + } - public String getAcronym() { - return acronym; - } + public String getAcronym() { + return acronym; + } - public void setAcronym(String acronym) { - this.acronym = acronym; - } + public void setAcronym(String acronym) { + this.acronym = acronym; + } - public String getTitle() { - return title; - } + public String getTitle() { + return title; + } - public void setTitle(String title) { - this.title = title; - } + public void setTitle(String title) { + this.title = title; + } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/graph/Organization.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/graph/Organization.java index 317e66b856..579245c053 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/graph/Organization.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/oaf/graph/Organization.java @@ -10,7 +10,6 @@ import eu.dnetlib.dhp.schema.dump.oaf.KeyValue; import eu.dnetlib.dhp.schema.dump.oaf.Qualifier; import eu.dnetlib.dhp.schema.dump.oaf.community.Project; - /** * To represent the generic organizaiton. It has the following parameters: * - private String legalshortname to store the legalshortname of the organizaiton diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java index 32fa65bb7e..41142d2851 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java @@ -366,48 +366,47 @@ public class ResultMapper implements Serializable { private static Instance getInstance(eu.dnetlib.dhp.schema.oaf.Instance i, boolean graph) { - Instance instance = new Instance(); + Instance instance = new Instance(); - if(!graph){ + if (!graph) { + instance + .setCollectedfrom( + KeyValue + .newInstance(i.getCollectedfrom().getKey(), i.getCollectedfrom().getValue())); + instance + .setHostedby( + KeyValue.newInstance(i.getHostedby().getKey(), i.getHostedby().getValue())); + } + + Optional opAr = Optional + .ofNullable(i.getAccessright()); + if (opAr.isPresent()) { + if (Constants.accessRightsCoarMap.containsKey(opAr.get().getClassid())) { + String code = Constants.accessRightsCoarMap.get(opAr.get().getClassid()); instance - .setCollectedfrom( - KeyValue - .newInstance(i.getCollectedfrom().getKey(), i.getCollectedfrom().getValue())); - instance - .setHostedby( - KeyValue.newInstance(i.getHostedby().getKey(), i.getHostedby().getValue())); + .setAccessright( + AccessRight + .newInstance( + code, + Constants.coarCodeLabelMap.get(code), + Constants.COAR_ACCESS_RIGHT_SCHEMA)); } + } - Optional opAr = Optional - .ofNullable(i.getAccessright()); - if (opAr.isPresent()) { - if (Constants.accessRightsCoarMap.containsKey(opAr.get().getClassid())) { - String code = Constants.accessRightsCoarMap.get(opAr.get().getClassid()); - instance - .setAccessright( - AccessRight - .newInstance( - code, - Constants.coarCodeLabelMap.get(code), - Constants.COAR_ACCESS_RIGHT_SCHEMA)); - } - } - - - Optional - .ofNullable(i.getLicense()) - .ifPresent(value -> instance.setLicense(value.getValue())); - Optional - .ofNullable(i.getDateofacceptance()) - .ifPresent(value -> instance.setPublicationdate(value.getValue())); - Optional - .ofNullable(i.getRefereed()) - .ifPresent(value -> instance.setRefereed(value.getClassname())); - // .ifPresent(value -> instance.setRefereed(value.getValue())); - Optional - .ofNullable(i.getInstancetype()) - .ifPresent(value -> instance.setType(value.getClassname())); - Optional.ofNullable(i.getUrl()).ifPresent(value -> instance.setUrl(value)); + Optional + .ofNullable(i.getLicense()) + .ifPresent(value -> instance.setLicense(value.getValue())); + Optional + .ofNullable(i.getDateofacceptance()) + .ifPresent(value -> instance.setPublicationdate(value.getValue())); + Optional + .ofNullable(i.getRefereed()) + .ifPresent(value -> instance.setRefereed(value.getClassname())); + // .ifPresent(value -> instance.setRefereed(value.getValue())); + Optional + .ofNullable(i.getInstancetype()) + .ifPresent(value -> instance.setType(value.getClassname())); + Optional.ofNullable(i.getUrl()).ifPresent(value -> instance.setUrl(value)); return instance; } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SendToZenodoHDFS.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SendToZenodoHDFS.java index 9ffc313990..23784cd66c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SendToZenodoHDFS.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SendToZenodoHDFS.java @@ -3,11 +3,13 @@ package eu.dnetlib.dhp.oa.graph.dump; import java.io.Serializable; import java.util.Optional; + import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.api.MissingConceptDoiException; import eu.dnetlib.dhp.common.api.ZenodoAPIClient; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java index 26359cca81..86421cff51 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java @@ -8,8 +8,6 @@ import java.io.StringReader; import java.util.*; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.dump.oaf.graph.Funder; -import eu.dnetlib.dhp.schema.dump.oaf.graph.Project; import org.apache.spark.SparkConf; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -24,6 +22,8 @@ import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.dump.oaf.*; import eu.dnetlib.dhp.schema.dump.oaf.graph.*; +import eu.dnetlib.dhp.schema.dump.oaf.graph.Funder; +import eu.dnetlib.dhp.schema.dump.oaf.graph.Project; import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.dhp.schema.oaf.Journal; import eu.dnetlib.dhp.schema.oaf.OafEntity; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java index 50b5771e4e..f17e7c894c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java @@ -1,5 +1,4 @@ - package eu.dnetlib.dhp.oa.graph.dump.graph; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/SplitForCommunityTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/SplitForCommunityTest.java index c1478d6439..42ad5634a7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/SplitForCommunityTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/SplitForCommunityTest.java @@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.graph.dump; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; + import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -16,7 +17,9 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import com.fasterxml.jackson.databind.ObjectMapper; + import eu.dnetlib.dhp.oa.graph.dump.community.CommunitySplit; import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/graph/ExtractRelationFromEntityTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/graph/ExtractRelationFromEntityTest.java index 109458355d..820a899ce0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/graph/ExtractRelationFromEntityTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/graph/ExtractRelationFromEntityTest.java @@ -38,7 +38,6 @@ public class ExtractRelationFromEntityTest { private static final Logger log = LoggerFactory .getLogger(ExtractRelationFromEntityTest.class); - @BeforeAll public static void beforeAll() throws IOException { workingDir = Files From a597a218ab739c100c608b3fe42a5cb3b00955cc Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 8 Sep 2020 16:39:40 +0200 Subject: [PATCH 07/18] * forall topics --- .../java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index c64cf6a7aa..080350c1cf 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -106,7 +106,8 @@ public class IndexNotificationsJob { final long date) { final List list = subscriptions .stream() - .filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals(e.getTopic())) + .filter( + s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) .map(s -> generateNotification(s, e, date)) .collect(Collectors.toList()); From 028613b7513dcfbb25baa0531d3b02bf5a1e4d32 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Wed, 9 Sep 2020 15:32:06 +0200 Subject: [PATCH 08/18] remove old notifications --- .../dhp/broker/oa/IndexNotificationsJob.java | 38 ++++++++++++------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index 080350c1cf..b890ed3287 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -11,6 +11,7 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; @@ -45,9 +46,8 @@ public class IndexNotificationsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - IndexNotificationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); + .toString(IndexNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); @@ -68,7 +68,7 @@ public class IndexNotificationsJob { final LongAccumulator total = spark.sparkContext().longAccumulator("total_indexed"); - final long now = new Date().getTime(); + final long startTime = new Date().getTime(); final List subscriptions = listSubscriptions(brokerApiBaseUrl); @@ -77,7 +77,7 @@ public class IndexNotificationsJob { if (subscriptions.size() > 0) { final Dataset notifications = ClusterUtils .readPath(spark, eventsPath, Event.class) - .map(e -> generateNotifications(e, subscriptions, now), Encoders.bean(NotificationGroup.class)) + .map(e -> generateNotifications(e, subscriptions, startTime), Encoders.bean(NotificationGroup.class)) .flatMap(g -> g.getData().iterator(), Encoders.bean(Notification.class)); final JavaRDD inputRdd = notifications @@ -98,6 +98,10 @@ public class IndexNotificationsJob { log.info("*** Start indexing"); JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); log.info("*** End indexing"); + + log.info("*** Deleting old notifications"); + final String message = deleteOldNotifications(brokerApiBaseUrl, startTime - 1000); + log.info("*** Deleted notifications: " + message); } } @@ -106,8 +110,7 @@ public class IndexNotificationsJob { final long date) { final List list = subscriptions .stream() - .filter( - s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) + .filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) .map(s -> generateNotification(s, e, date)) .collect(Collectors.toList()); @@ -138,18 +141,15 @@ public class IndexNotificationsJob { if (conditions.containsKey("trust") && !SubscriptionUtils - .verifyFloatRange( - map.getTrust(), conditions.get("trust").get(0).getValue(), - conditions.get("trust").get(0).getOtherValue())) { + .verifyFloatRange(map.getTrust(), conditions.get("trust").get(0).getValue(), conditions.get("trust").get(0).getOtherValue())) { return false; } if (conditions.containsKey("targetDateofacceptance") && !conditions .get("targetDateofacceptance") .stream() - .anyMatch( - c -> SubscriptionUtils - .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { + .anyMatch(c -> SubscriptionUtils + .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { return false; } @@ -197,6 +197,18 @@ public class IndexNotificationsJob { } + private static String deleteOldNotifications(final String brokerApiBaseUrl, final long l) throws Exception { + final String url = brokerApiBaseUrl + "/api/notifications/byDate/0/" + l; + final HttpDelete req = new HttpDelete(url); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + return IOUtils.toString(response.getEntity().getContent()); + } + } + + } + private static String prepareForIndexing(final Notification n, final LongAccumulator acc) throws JsonProcessingException { acc.add(1); From 9b0c12f5d38bd028aa2c21c32323acd00ec6e1e2 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Fri, 11 Sep 2020 12:06:16 +0200 Subject: [PATCH 09/18] send notifications --- .../dhp/broker/oa/IndexNotificationsJob.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index b890ed3287..cb7acb46df 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.broker.oa; +import java.io.IOException; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -102,6 +103,11 @@ public class IndexNotificationsJob { log.info("*** Deleting old notifications"); final String message = deleteOldNotifications(brokerApiBaseUrl, startTime - 1000); log.info("*** Deleted notifications: " + message); + + log.info("*** sendNotifications (emails, ...)"); + sendNotifications(brokerApiBaseUrl, startTime - 1000); + log.info("*** ALL done."); + } } @@ -194,7 +200,6 @@ public class IndexNotificationsJob { .readValue(s, mapper.getTypeFactory().constructCollectionType(List.class, Subscription.class)); } } - } private static String deleteOldNotifications(final String brokerApiBaseUrl, final long l) throws Exception { @@ -206,7 +211,17 @@ public class IndexNotificationsJob { return IOUtils.toString(response.getEntity().getContent()); } } + } + private static String sendNotifications(final String brokerApiBaseUrl, final long l) throws IOException { + final String url = brokerApiBaseUrl + "/api/openaireBroker/notifications/send/" + l; + final HttpGet req = new HttpGet(url); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + return IOUtils.toString(response.getEntity().getContent()); + } + } } private static String prepareForIndexing(final Notification n, final LongAccumulator acc) From cf2ce1a09bfb792a1b00c79727fa9490bfad1bd1 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 15 Sep 2020 15:58:03 +0200 Subject: [PATCH 10/18] code formatting --- .../dhp/broker/oa/IndexNotificationsJob.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index cb7acb46df..792a2354a4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -47,8 +47,9 @@ public class IndexNotificationsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(IndexNotificationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); + .toString( + IndexNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); @@ -116,7 +117,8 @@ public class IndexNotificationsJob { final long date) { final List list = subscriptions .stream() - .filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) + .filter( + s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) .map(s -> generateNotification(s, e, date)) .collect(Collectors.toList()); @@ -147,15 +149,18 @@ public class IndexNotificationsJob { if (conditions.containsKey("trust") && !SubscriptionUtils - .verifyFloatRange(map.getTrust(), conditions.get("trust").get(0).getValue(), conditions.get("trust").get(0).getOtherValue())) { + .verifyFloatRange( + map.getTrust(), conditions.get("trust").get(0).getValue(), + conditions.get("trust").get(0).getOtherValue())) { return false; } if (conditions.containsKey("targetDateofacceptance") && !conditions .get("targetDateofacceptance") .stream() - .anyMatch(c -> SubscriptionUtils - .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { + .anyMatch( + c -> SubscriptionUtils + .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { return false; } From 51321c2701b22a617623f0e89e7a0a1a6b4bf09d Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 17 Sep 2020 11:38:07 +0200 Subject: [PATCH 11/18] partition of events by opedoarId --- .../dhp/broker/oa/IndexNotificationsJob.java | 17 ++- .../broker/oa/PartitionEventsByDsIdJob.java | 113 +++++++++++++++ .../oozie_app/config-default.xml | 18 +++ .../notifications_only/oozie_app/workflow.xml | 137 ++++++++++++++++++ .../broker/oa/partial/oozie_app/workflow.xml | 15 +- pom.xml | 2 +- 6 files changed, 286 insertions(+), 16 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index cb7acb46df..792a2354a4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -47,8 +47,9 @@ public class IndexNotificationsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(IndexNotificationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); + .toString( + IndexNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); @@ -116,7 +117,8 @@ public class IndexNotificationsJob { final long date) { final List list = subscriptions .stream() - .filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) + .filter( + s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) .map(s -> generateNotification(s, e, date)) .collect(Collectors.toList()); @@ -147,15 +149,18 @@ public class IndexNotificationsJob { if (conditions.containsKey("trust") && !SubscriptionUtils - .verifyFloatRange(map.getTrust(), conditions.get("trust").get(0).getValue(), conditions.get("trust").get(0).getOtherValue())) { + .verifyFloatRange( + map.getTrust(), conditions.get("trust").get(0).getValue(), + conditions.get("trust").get(0).getOtherValue())) { return false; } if (conditions.containsKey("targetDateofacceptance") && !conditions .get("targetDateofacceptance") .stream() - .anyMatch(c -> SubscriptionUtils - .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { + .anyMatch( + c -> SubscriptionUtils + .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { return false; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java new file mode 100644 index 0000000000..0748624f77 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java @@ -0,0 +1,113 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +import eu.dnetlib.broker.api.ShortEventMessage; +import eu.dnetlib.broker.objects.OaBrokerEventPayload; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import scala.Tuple2; + +public class PartitionEventsByDsIdJob { + + private static final Logger log = LoggerFactory.getLogger(PartitionEventsByDsIdJob.class); + private static final String OPENDOAR_NSPREFIX = "opendoar____::"; + + public static void main(final String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + PartitionEventsByDsIdJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final SparkConf conf = new SparkConf(); + + final String eventsPath = parser.get("workingPath") + "/events"; + log.info("eventsPath: {}", eventsPath); + + final String partitionPath = parser.get("workingPath") + "/eventsByOpendoarId"; + log.info("partitionPath: {}", partitionPath); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + ClusterUtils + .readPath(spark, eventsPath, Event.class) + .filter(e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId())) + .filter(e -> e.getMap().getTargetDatasourceId().contains(OPENDOAR_NSPREFIX)) + .map( + e -> new Tuple2<>( + StringUtils.substringAfter(e.getMap().getTargetDatasourceId(), OPENDOAR_NSPREFIX), + messageFromNotification(e)), + Encoders.tuple(Encoders.STRING(), Encoders.bean(ShortEventMessage.class))) + .write() + .partitionBy("_1") + .mode(SaveMode.Overwrite) + .json(partitionPath); + + }); + renameSubDirs(partitionPath); + + } + + private static void renameSubDirs(final String path) throws IOException { + final String prefix = "_1="; + final FileSystem fs = FileSystem.get(new Configuration()); + + log.info("** Renaming subdirs of " + path); + for (final FileStatus fileStatus : fs.listStatus(new Path(path))) { + if (fileStatus.isDirectory()) { + final Path oldPath = fileStatus.getPath(); + final String oldName = oldPath.getName(); + if (oldName.startsWith(prefix)) { + final Path newPath = new Path(path + "/" + StringUtils.substringAfter(oldName, prefix)); + log.info(" * " + oldPath.getName() + " -> " + newPath.getName()); + fs.rename(oldPath, newPath); + } + } + } + } + + private static ShortEventMessage messageFromNotification(final Event e) { + final Gson gson = new Gson(); + + final OaBrokerEventPayload payload = gson.fromJson(e.getPayload(), OaBrokerEventPayload.class); + + final ShortEventMessage res = new ShortEventMessage(); + + res.setOriginalId(payload.getResult().getOriginalId()); + res.setTitle(payload.getResult().getTitles().stream().filter(StringUtils::isNotBlank).findFirst().orElse(null)); + res.setTopic(e.getTopic()); + res.setTrust(payload.getTrust()); + res.generateMessageFromObject(payload.getHighlight()); + + return res; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/config-default.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/config-default.xml new file mode 100644 index 0000000000..2e0ed9aeea --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml new file mode 100644 index 0000000000..f629c2101e --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml @@ -0,0 +1,137 @@ + + + + + graphInputPath + the path where the graph is stored + + + workingPath + the path where the the generated data will be stored + + + datasourceIdWhitelist + - + a white list (comma separeted, - for empty list) of datasource ids + + + datasourceTypeWhitelist + - + a white list (comma separeted, - for empty list) of datasource types + + + datasourceIdBlacklist + - + a black list (comma separeted, - for empty list) of datasource ids + + + esEventIndexName + the elasticsearch index name for events + + + esNotificationsIndexName + the elasticsearch index name for notifications + + + esIndexHost + the elasticsearch host + + + maxIndexedEventsForDsAndTopic + the max number of events for each couple (ds/topic) + + + brokerApiBaseUrl + the url of the broker service api + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + IndexNotificationsOnESJob + eu.dnetlib.dhp.broker.oa.IndexNotificationsJob + dhp-broker-events-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --workingPath${workingPath} + --index${esNotificationsIndexName} + --esHost${esIndexHost} + --brokerApiBaseUrl${brokerApiBaseUrl} + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index f629c2101e..a9741a3074 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -99,38 +99,35 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - IndexNotificationsOnESJob - eu.dnetlib.dhp.broker.oa.IndexNotificationsJob + PartitionEventsByDsIdJob + eu.dnetlib.dhp.broker.oa.PartitionEventsByDsIdJob dhp-broker-events-${projectVersion}.jar + --executor-cores=${sparkExecutorCores} --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --conf spark.dynamicAllocation.maxExecutors="8" --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 + --graphPath${graphInputPath} --workingPath${workingPath} - --index${esNotificationsIndexName} - --esHost${esIndexHost} - --brokerApiBaseUrl${brokerApiBaseUrl} - diff --git a/pom.xml b/pom.xml index e88e1d51b7..52edd497f4 100644 --- a/pom.xml +++ b/pom.xml @@ -663,7 +663,7 @@ 3.3.3 3.4.2 [2.12,3.0) - 3.1.0 + 3.1.1 7.5.0 4.7.2 1.1 From 9e681609fdd03a0549ec956587b362a9a2d3241b Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 17 Sep 2020 15:51:22 +0200 Subject: [PATCH 12/18] stats to sql file --- .../dhp/broker/oa/GenerateStatsJob.java | 50 ++++++++++++++++--- .../aggregators/stats/DatasourceStats.java | 28 ++++++----- .../aggregators/stats/StatsAggregator.java | 6 ++- .../broker/oa/partial/oozie_app/workflow.xml | 25 ++++++++-- .../dnetlib/dhp/broker/oa/stats_params.json | 32 ++++++++++++ 5 files changed, 113 insertions(+), 28 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java index 8a7229b647..8a9009f324 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java @@ -3,11 +3,16 @@ package eu.dnetlib.dhp.broker.oa; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.io.IOException; import java.util.Optional; +import java.util.Properties; import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; -import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.TypedColumn; import org.slf4j.Logger; @@ -29,7 +34,7 @@ public class GenerateStatsJob { IOUtils .toString( GenerateStatsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/stats_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -43,21 +48,50 @@ public class GenerateStatsJob { final String eventsPath = parser.get("workingPath") + "/events"; log.info("eventsPath: {}", eventsPath); - final String statsPath = parser.get("workingPath") + "/stats"; - log.info("stats: {}", statsPath); + final String dbUrl = parser.get("dbUrl"); + log.info("dbUrl: {}", dbUrl); + + final String dbUser = parser.get("dbUser"); + log.info("dbUser: {}", dbUser); + + final String dbPassword = parser.get("dbPassword"); + log.info("dbPassword: {}", "***"); + + final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl"); + log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl); final TypedColumn aggr = new StatsAggregator().toColumn(); + final Properties connectionProperties = new Properties(); + connectionProperties.put("user", dbUser); + connectionProperties.put("password", dbPassword); + runWithSparkSession(conf, isSparkSessionManaged, spark -> { - final Dataset stats = ClusterUtils + ClusterUtils .readPath(spark, eventsPath, Event.class) - .groupByKey(e -> e.getMap().getTargetDatasourceId(), Encoders.STRING()) + .groupByKey(e -> e.getTopic() + "@@@" + e.getMap().getTargetDatasourceId(), Encoders.STRING()) .agg(aggr) - .map(t -> t._2, Encoders.bean(DatasourceStats.class)); + .map(t -> t._2, Encoders.bean(DatasourceStats.class)) + .write() + .jdbc(dbUrl, "oa_datasource_stats_temp", connectionProperties); + + log.info("*** updateStats"); + updateStats(brokerApiBaseUrl); + log.info("*** ALL done."); - ClusterUtils.save(stats, statsPath, DatasourceStats.class, null); }); } + private static String updateStats(final String brokerApiBaseUrl) throws IOException { + final String url = brokerApiBaseUrl + "/api/openaireBroker/stats/update"; + final HttpGet req = new HttpGet(url); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + return IOUtils.toString(response.getEntity().getContent()); + } + } + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java index 8b628809db..979bac2da6 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java @@ -2,8 +2,6 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.stats; import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; public class DatasourceStats implements Serializable { @@ -15,7 +13,8 @@ public class DatasourceStats implements Serializable { private String id; private String name; private String type; - private Map topics = new HashMap<>(); + private String topic; + private long size = 0l; public String getId() { return id; @@ -41,21 +40,24 @@ public class DatasourceStats implements Serializable { this.type = type; } - public Map getTopics() { - return topics; + public String getTopic() { + return topic; } - public void setTopics(final Map topics) { - this.topics = topics; + public void setTopic(final String topic) { + this.topic = topic; } - public void incrementTopic(final String topic, final long inc) { - if (topics.containsKey(topic)) { - topics.put(topic, topics.get(topic) + inc); - } else { - topics.put(topic, inc); - } + public long getSize() { + return size; + } + public void setSize(final long size) { + this.size = size; + } + + public void incrementSize(final long inc) { + this.size = this.size + inc; } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java index 5aa6698e39..240e2d2112 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java @@ -25,7 +25,8 @@ public class StatsAggregator extends Aggregator stats0.incrementTopic(e.getKey(), e.getValue())); + stats0.incrementSize(stats1.getSize()); return stats0; } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index a9741a3074..407b9f42f7 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -44,6 +44,18 @@ brokerApiBaseUrl the url of the broker service api + + brokerDbUrl + the url of the broker database + + + brokerDbUser + the user of the broker database + + + brokerDbPassword + the password of the broker database + sparkDriverMemory memory for driver process @@ -99,18 +111,18 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - PartitionEventsByDsIdJob - eu.dnetlib.dhp.broker.oa.PartitionEventsByDsIdJob + GenerateStatsJob + eu.dnetlib.dhp.broker.oa.GenerateStatsJob dhp-broker-events-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -122,8 +134,11 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --graphPath${graphInputPath} --workingPath${workingPath} + --dbUrl${brokerDbUrl} + --dbUser${brokerDbUser} + --dbPassword${brokerDbPassword} + --brokerApiBaseUrl${brokerApiBaseUrl} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json new file mode 100644 index 0000000000..15d7d251f6 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "wp", + "paramLongName": "workingPath", + "paramDescription": "the working path", + "paramRequired": true + }, + { + "paramName": "dburl", + "paramLongName": "dbUrl", + "paramDescription": "the broker database url", + "paramRequired": true + }, + { + "paramName": "u", + "paramLongName": "dbUser", + "paramDescription": "the broker database user", + "paramRequired": true + }, + { + "paramName": "p", + "paramLongName": "dbPassword", + "paramDescription": "the broker database password", + "paramRequired": true + }, + { + "paramName": "broker", + "paramLongName": "brokerApiBaseUrl", + "paramDescription": "the url of the broker service api", + "paramRequired": true + } +] From 9e3e93c6b6a919878293adbfe27406be7abd5677 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 24 Sep 2020 10:39:16 +0200 Subject: [PATCH 13/18] setting the correct issn type in the datasource.journal element --- .../dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java | 2 +- .../test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_openapc.xml | 0 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_openapc.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index 87c935d835..1e7b56ee9c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -577,7 +577,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i final String lissn = StringUtils.isNotBlank(arr[2]) ? arr[2].trim() : null; if (issn != null || eissn != null || lissn != null) { - return journal(name, issn, eissn, eissn, null, null, null, null, null, null, null, info); + return journal(name, issn, eissn, lissn, null, null, null, null, null, null, null, info); } } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_openapc.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_openapc.xml new file mode 100644 index 0000000000..e69de29bb2 From 9a7e72d528d093f2ff338df32c9e7f99f87158e1 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 24 Sep 2020 10:42:47 +0200 Subject: [PATCH 14/18] using concat_ws to join textual columns from PSQL. When using || to perform the concatenation, Null columns makes the operation result to be Null --- .../resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql index 43b0f8f4b1..d6eae3b555 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql @@ -85,7 +85,7 @@ SELECT dc.officialname AS collectedfromname, d.typology||'@@@dnet:datasource_typologies' AS datasourcetype, 'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction, - d.issn || ' @@@ ' || d.eissn || ' @@@ ' || d.lissn AS journal + concat_ws(' @@@ ', issn, lissn, eissn) AS journal FROM dsm_datasources d From 42f55395c80dca6c4cfb09c554b7afe402415ee0 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 24 Sep 2020 12:09:58 +0200 Subject: [PATCH 15/18] fixed order of the ISSNs returned by the SQL query --- .../resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql index d6eae3b555..3033b9f879 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql @@ -85,7 +85,7 @@ SELECT dc.officialname AS collectedfromname, d.typology||'@@@dnet:datasource_typologies' AS datasourcetype, 'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction, - concat_ws(' @@@ ', issn, lissn, eissn) AS journal + concat_ws(' @@@ ', issn, eissn, lissn) AS journal FROM dsm_datasources d From fb22f4d70b08ca7fbd3b1572f36ee9addba1b00a Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 24 Sep 2020 12:10:59 +0200 Subject: [PATCH 16/18] included values for projects fundedamount and totalcost fields in the mapping tests. Swapped expected and actual values in junit test assertions --- .../raw/MigrateDbEntitiesApplicationTest.java | 62 ++++++++++++------- .../graph/raw/projects_resultset_entry.json | 4 +- 2 files changed, 40 insertions(+), 26 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java index 22fcb36c98..e8059f5069 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java @@ -73,12 +73,16 @@ public class MigrateDbEntitiesApplicationTest { final Datasource ds = (Datasource) list.get(0); assertValidId(ds.getId()); assertValidId(ds.getCollectedfrom().get(0).getKey()); - assertEquals(ds.getOfficialname().getValue(), getValueAsString("officialname", fields)); - assertEquals(ds.getEnglishname().getValue(), getValueAsString("englishname", fields)); - assertEquals(ds.getContactemail().getValue(), getValueAsString("contactemail", fields)); - assertEquals(ds.getWebsiteurl().getValue(), getValueAsString("websiteurl", fields)); - assertEquals(ds.getNamespaceprefix().getValue(), getValueAsString("namespaceprefix", fields)); - assertEquals(ds.getCollectedfrom().get(0).getValue(), getValueAsString("collectedfromname", fields)); + assertEquals(getValueAsString("officialname", fields), ds.getOfficialname().getValue()); + assertEquals(getValueAsString("englishname", fields), ds.getEnglishname().getValue()); + assertEquals(getValueAsString("contactemail", fields), ds.getContactemail().getValue()); + assertEquals(getValueAsString("websiteurl", fields), ds.getWebsiteurl().getValue()); + assertEquals(getValueAsString("namespaceprefix", fields), ds.getNamespaceprefix().getValue()); + assertEquals(getValueAsString("collectedfromname", fields), ds.getCollectedfrom().get(0).getValue()); + assertEquals(getValueAsString("officialname", fields), ds.getJournal().getName()); + assertEquals("2579-5449", ds.getJournal().getIssnPrinted()); + assertEquals("2597-6540", ds.getJournal().getIssnOnline()); + assertEquals(null, ds.getJournal().getIssnLinking()); } @Test @@ -92,9 +96,11 @@ public class MigrateDbEntitiesApplicationTest { final Project p = (Project) list.get(0); assertValidId(p.getId()); assertValidId(p.getCollectedfrom().get(0).getKey()); - assertEquals(p.getAcronym().getValue(), getValueAsString("acronym", fields)); - assertEquals(p.getTitle().getValue(), getValueAsString("title", fields)); - assertEquals(p.getCollectedfrom().get(0).getValue(), getValueAsString("collectedfromname", fields)); + assertEquals(getValueAsString("acronym", fields), p.getAcronym().getValue()); + assertEquals(getValueAsString("title", fields), p.getTitle().getValue()); + assertEquals(getValueAsString("collectedfromname", fields), p.getCollectedfrom().get(0).getValue()); + assertEquals(getValueAsFloat("fundedamount", fields), p.getFundedamount()); + assertEquals(getValueAsFloat("totalcost", fields), p.getTotalcost()); } @Test @@ -110,14 +116,14 @@ public class MigrateDbEntitiesApplicationTest { final Organization o = (Organization) list.get(0); assertValidId(o.getId()); assertValidId(o.getCollectedfrom().get(0).getKey()); - assertEquals(o.getLegalshortname().getValue(), getValueAsString("legalshortname", fields)); - assertEquals(o.getLegalname().getValue(), getValueAsString("legalname", fields)); - assertEquals(o.getWebsiteurl().getValue(), getValueAsString("websiteurl", fields)); - assertEquals(o.getCountry().getClassid(), getValueAsString("country", fields).split("@@@")[0]); - assertEquals(o.getCountry().getClassname(), getValueAsString("country", fields).split("@@@")[0]); - assertEquals(o.getCountry().getSchemeid(), getValueAsString("country", fields).split("@@@")[1]); - assertEquals(o.getCountry().getSchemename(), getValueAsString("country", fields).split("@@@")[1]); - assertEquals(o.getCollectedfrom().get(0).getValue(), getValueAsString("collectedfromname", fields)); + assertEquals(getValueAsString("legalshortname", fields), o.getLegalshortname().getValue()); + assertEquals(getValueAsString("legalname", fields), o.getLegalname().getValue()); + assertEquals(getValueAsString("websiteurl", fields), o.getWebsiteurl().getValue()); + assertEquals(getValueAsString("country", fields).split("@@@")[0], o.getCountry().getClassid()); + assertEquals(getValueAsString("country", fields).split("@@@")[0], o.getCountry().getClassname()); + assertEquals(getValueAsString("country", fields).split("@@@")[1], o.getCountry().getSchemeid()); + assertEquals(getValueAsString("country", fields).split("@@@")[1], o.getCountry().getSchemename()); + assertEquals(getValueAsString("collectedfromname", fields), o.getCollectedfrom().get(0).getValue()); } @Test @@ -322,14 +328,22 @@ public class MigrateDbEntitiesApplicationTest { } private String getValueAsString(final String name, final List fields) { + return getValueAs(name, fields); + } + + private Float getValueAsFloat(final String name, final List fields) { + return new Float(getValueAs(name, fields).toString()); + } + + private T getValueAs(final String name, final List fields) { return fields - .stream() - .filter(f -> f.getField().equals(name)) - .map(TypedField::getValue) - .filter(Objects::nonNull) - .map(o -> o.toString()) - .findFirst() - .get(); + .stream() + .filter(f -> f.getField().equals(name)) + .map(TypedField::getValue) + .filter(Objects::nonNull) + .map(o -> (T) o) + .findFirst() + .get(); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json index d6109cac1f..a25215ca36 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json @@ -142,12 +142,12 @@ { "field": "totalcost", "type": "double", - "value": null + "value": 157846 }, { "field": "fundedamount", "type": "double", - "value": null + "value": 157846 }, { "field": "collectedfromid", From 27df1cea6d5c658c77156f0324a5b0f4e9189dbb Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 24 Sep 2020 12:16:00 +0200 Subject: [PATCH 17/18] code formatting --- .../raw/MigrateDbEntitiesApplicationTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java index e8059f5069..011cc18e6e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java @@ -337,13 +337,13 @@ public class MigrateDbEntitiesApplicationTest { private T getValueAs(final String name, final List fields) { return fields - .stream() - .filter(f -> f.getField().equals(name)) - .map(TypedField::getValue) - .filter(Objects::nonNull) - .map(o -> (T) o) - .findFirst() - .get(); + .stream() + .filter(f -> f.getField().equals(name)) + .map(TypedField::getValue) + .filter(Objects::nonNull) + .map(o -> (T) o) + .findFirst() + .get(); } } From 044d3a021418e8b53cc15a1c8dd3b93bd89b0816 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 24 Sep 2020 13:48:58 +0200 Subject: [PATCH 18/18] fixed query used to load datasources in the Graph --- .../eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql index 3033b9f879..7ca672835b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql @@ -3,7 +3,7 @@ SELECT d.id || array_agg(distinct di.pid) AS identities, d.officialname AS officialname, d.englishname AS englishname, - d.contactemail AS contactemail, + d.contactemail AS contactemail, CASE WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY ['openaire-cris_1.1']) THEN @@ -85,7 +85,7 @@ SELECT dc.officialname AS collectedfromname, d.typology||'@@@dnet:datasource_typologies' AS datasourcetype, 'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction, - concat_ws(' @@@ ', issn, eissn, lissn) AS journal + concat_ws(' @@@ ', d.issn, d.eissn, d.lissn) AS journal FROM dsm_datasources d