diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java index 0cb0d7801..429eb7d11 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java @@ -2,7 +2,6 @@ package eu.dnetlib.dhp.broker.model; import java.text.ParseException; -import java.util.Date; import java.util.List; import java.util.stream.Collectors; @@ -19,16 +18,12 @@ public class EventFactory { private final static String PRODUCER_ID = "OpenAIRE"; - private static final int TTH_DAYS = 365; - private final static String[] DATE_PATTERNS = { "yyyy-MM-dd" }; public static Event newBrokerEvent(final UpdateInfo updateInfo) { - final long now = new Date().getTime(); - final Event res = new Event(); final MappedFields map = createMapFromResult(updateInfo); @@ -44,8 +39,8 @@ public class EventFactory { res.setPayload(updateInfo.asBrokerPayload().toJSON()); res.setMap(map); res.setTopic(updateInfo.getTopicPath()); - res.setCreationDate(now); - res.setExpiryDate(calculateExpiryDate(now)); + res.setCreationDate(0l); + res.setExpiryDate(Long.MAX_VALUE); res.setInstantMessage(false); return res; @@ -96,7 +91,9 @@ public class EventFactory { return map; } - private static String calculateEventId(final String topic, final String dsId, final String publicationId, + private static String calculateEventId(final String topic, + final String dsId, + final String publicationId, final String value) { return "event-" + DigestUtils.md5Hex(topic).substring(0, 4) + "-" @@ -105,10 +102,6 @@ public class EventFactory { + DigestUtils.md5Hex(value).substring(0, 5); } - private static long calculateExpiryDate(final long now) { - return now + TTH_DAYS * 24 * 60 * 60 * 1000; - } - private static long parseDateTolong(final String date) { if (StringUtils.isBlank(date)) { return -1; diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java index a51601cd7..8a7229b64 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java @@ -28,7 +28,7 @@ public class GenerateStatsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( - IndexOnESJob.class + GenerateStatsJob.class .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); parser.parseArgument(args); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java new file mode 100644 index 000000000..17451d067 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java @@ -0,0 +1,100 @@ + +package eu.dnetlib.dhp.broker.oa; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.TypedColumn; +import org.apache.spark.util.LongAccumulator; +import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.EventGroup; +import eu.dnetlib.dhp.broker.oa.util.aggregators.subset.EventSubsetAggregator; + +public class IndexEventSubsetJob { + + private static final Logger log = LoggerFactory.getLogger(IndexEventSubsetJob.class); + + public static void main(final String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + IndexEventSubsetJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_event_subset.json"))); + parser.parseArgument(args); + + final SparkConf conf = new SparkConf(); + + final String eventsPath = parser.get("workingPath") + "/events"; + log.info("eventsPath: {}", eventsPath); + + final String index = parser.get("index"); + log.info("index: {}", index); + + final String indexHost = parser.get("esHost"); + log.info("indexHost: {}", indexHost); + + final int maxEventsForTopic = NumberUtils.toInt(parser.get("maxEventsForTopic")); + log.info("maxEventsForTopic: {}", maxEventsForTopic); + + final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); + + final TypedColumn aggr = new EventSubsetAggregator(maxEventsForTopic).toColumn(); + + final LongAccumulator total = spark.sparkContext().longAccumulator("total_indexed"); + + final long now = new Date().getTime(); + + final Dataset subset = ClusterUtils + .readPath(spark, eventsPath, Event.class) + .groupByKey(e -> e.getTopic() + '@' + e.getMap().getTargetDatasourceId(), Encoders.STRING()) + .agg(aggr) + .map(t -> t._2, Encoders.bean(EventGroup.class)) + .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class)); + + final JavaRDD inputRdd = subset + .map(e -> prepareEventForIndexing(e, now, total), Encoders.STRING()) + .javaRDD(); + + final Map esCfg = new HashMap<>(); + // esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54"); + + esCfg.put("es.index.auto.create", "false"); + esCfg.put("es.nodes", indexHost); + esCfg.put("es.mapping.id", "eventId"); // THE PRIMARY KEY + esCfg.put("es.batch.write.retry.count", "8"); + esCfg.put("es.batch.write.retry.wait", "60s"); + esCfg.put("es.batch.size.entries", "200"); + esCfg.put("es.nodes.wan.only", "true"); + + JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); + } + + private static String prepareEventForIndexing(final Event e, final long creationDate, final LongAccumulator acc) + throws JsonProcessingException { + acc.add(1); + + e.setCreationDate(creationDate); + e.setExpiryDate(Long.MAX_VALUE); + + return new ObjectMapper().writeValueAsString(e); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java index 0dc34cc42..762bfbb90 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java @@ -20,6 +20,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.model.Event; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +@Deprecated public class IndexOnESJob { private static final Logger log = LoggerFactory.getLogger(IndexOnESJob.class); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/subset/EventSubsetAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/subset/EventSubsetAggregator.java new file mode 100644 index 000000000..cd403ade0 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/subset/EventSubsetAggregator.java @@ -0,0 +1,67 @@ + +package eu.dnetlib.dhp.broker.oa.util.aggregators.subset; + +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.expressions.Aggregator; + +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.oa.util.EventGroup; + +public class EventSubsetAggregator extends Aggregator { + + /** + * + */ + private static final long serialVersionUID = -678071078823059805L; + + private final int maxEventsForTopic; + + public EventSubsetAggregator(final int maxEventsForTopic) { + this.maxEventsForTopic = maxEventsForTopic; + } + + @Override + public EventGroup zero() { + return new EventGroup(); + } + + @Override + public EventGroup reduce(final EventGroup g, final Event e) { + if (g.getData().size() < maxEventsForTopic) { + g.getData().add(e); + } + return g; + } + + @Override + public EventGroup merge(final EventGroup g0, final EventGroup g1) { + final int missing = maxEventsForTopic - g0.getData().size(); + + if (missing > 0) { + if (g1.getData().size() < missing) { + g0.getData().addAll(g1.getData()); + } else { + g0.getData().addAll(g1.getData().subList(0, missing)); + } + } + + return g0; + } + + @Override + public EventGroup finish(final EventGroup g) { + return g; + } + + @Override + public Encoder outputEncoder() { + return Encoders.bean(EventGroup.class); + } + + @Override + public Encoder bufferEncoder() { + return Encoders.bean(EventGroup.class); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 568d5dc5a..a10794f63 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -25,13 +25,17 @@ a black list (comma separeted, - for empty list) of datasource ids - esIndexName + esEventIndexName the elasticsearch index name esIndexHost the elasticsearch host + + maxIndexedEventsForDsAndTopic + the max number of events for each couple (ds/topic) + sparkDriverMemory memory for driver process @@ -423,16 +427,16 @@ --datasourceTypeWhitelist${datasourceTypeWhitelist} --datasourceIdBlacklist${datasourceIdBlacklist} - + - + yarn cluster - IndexOnESJob - eu.dnetlib.dhp.broker.oa.IndexOnESJob + IndexEventSubsetOnESJob + eu.dnetlib.dhp.broker.oa.IndexEventSubsetJob dhp-broker-events-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -445,8 +449,9 @@ --conf spark.sql.shuffle.partitions=3840 --workingPath${workingPath} - --index${esIndexName} + --index${esEventIndexName} --esHost${esIndexHost} + --maxEventsForTopic${maxIndexedEventsForDsAndTopic} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json new file mode 100644 index 000000000..72703ae33 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "o", + "paramLongName": "workingPath", + "paramDescription": "the workinh path", + "paramRequired": true + }, + { + "paramName": "idx", + "paramLongName": "index", + "paramDescription": "the ES index", + "paramRequired": true + }, + { + "paramName": "es", + "paramLongName": "esHost", + "paramDescription": "the ES host", + "paramRequired": true + }, + { + "paramName": "n", + "paramLongName": "maxEventsForTopic", + "paramDescription": "the max number of events for each couple (ds/topic)", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index 3c43c0b1b..306343dbe 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -8,6 +8,33 @@ workingPath the path where the the generated data will be stored + + + datasourceIdWhitelist + - + a white list (comma separeted, - for empty list) of datasource ids + + + datasourceTypeWhitelist + - + a white list (comma separeted, - for empty list) of datasource types + + + datasourceIdBlacklist + - + a black list (comma separeted, - for empty list) of datasource ids + + + esEventIndexName + the elasticsearch index name + + + esIndexHost + the elasticsearch host + + + maxIndexedEventsForDsAndTopic + the max number of events for each couple (ds/topic) sparkDriverMemory @@ -64,18 +91,18 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - IndexOnESJob - eu.dnetlib.dhp.broker.oa.IndexOnESJob + IndexEventSubsetOnESJob + eu.dnetlib.dhp.broker.oa.IndexEventSubsetJob dhp-broker-events-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} @@ -88,8 +115,9 @@ --conf spark.sql.shuffle.partitions=3840 --workingPath${workingPath} - --index${esIndexName} + --index${esEventIndexName} --esHost${esIndexHost} + --maxEventsForTopic${maxIndexedEventsForDsAndTopic}