From 6f136734641b167fbbf9325255e1b69bdea8ef7d Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 29 Jun 2020 16:33:32 +0200 Subject: [PATCH] accumulators --- dhp-workflows/dhp-broker-events/pom.xml | 10 ++- .../dhp/broker/oa/GenerateEventsJob.java | 21 +++++- .../dnetlib/dhp/broker/oa/IndexOnESJob.java | 71 +++++++++++++++++++ .../dhp/broker/oa/matchers/UpdateMatcher.java | 17 ++++- .../dhp/broker/oa/util/EventFinder.java | 17 +++-- .../oa/generate_all/oozie_app/workflow.xml | 34 ++++++++- .../eu/dnetlib/dhp/broker/oa/index_es.json | 20 ++++++ .../broker/oa/partial/oozie_app/workflow.xml | 29 +++++++- 8 files changed, 207 insertions(+), 12 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml index 424015a3c..e3182c259 100644 --- a/dhp-workflows/dhp-broker-events/pom.xml +++ b/dhp-workflows/dhp-broker-events/pom.xml @@ -1,5 +1,7 @@ - + dhp-workflows eu.dnetlib.dhp @@ -24,7 +26,11 @@ org.apache.spark spark-sql_2.11 - + + org.elasticsearch + elasticsearch-hadoop + + eu.dnetlib.dhp diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java index dbe2fdd47..30e77be50 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java @@ -3,14 +3,18 @@ package eu.dnetlib.dhp.broker.oa; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.EventFinder; import eu.dnetlib.dhp.broker.oa.util.EventGroup; @@ -66,12 +71,15 @@ public class GenerateEventsJob { ClusterUtils.removeDir(spark, eventsPath); + final Map accumulators = prepareAccumulators(spark.sparkContext()); + final Dataset groups = ClusterUtils .readPath(spark, workingPath + "/duplicates", ResultGroup.class); final Dataset events = groups .map( - (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), + (MapFunction) g -> EventFinder + .generateEvents(g, dedupConfig, accumulators), Encoders.bean(EventGroup.class)) .flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class)); @@ -81,6 +89,17 @@ public class GenerateEventsJob { } + public static Map prepareAccumulators(final SparkContext sc) { + + return EventFinder + .getMatchers() + .stream() + .map(UpdateMatcher::accumulatorName) + .distinct() + .collect(Collectors.toMap(s -> s, s -> sc.longAccumulator(s))); + + } + private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception { final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java new file mode 100644 index 000000000..be8d14c5f --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java @@ -0,0 +1,71 @@ + +package eu.dnetlib.dhp.broker.oa; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.model.Event; + +public class IndexOnESJob { + + private static final Logger log = LoggerFactory.getLogger(IndexOnESJob.class); + + public static void main(final String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + IndexOnESJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_es.json"))); + parser.parseArgument(args); + + final SparkConf conf = new SparkConf(); + + final String eventsPath = parser.get("workingPath") + "/events"; + log.info("eventsPath: {}", eventsPath); + + final String index = parser.get("index"); + log.info("index: {}", index); + + final String indexHost = parser.get("esHost"); + log.info("indexHost: {}", indexHost); + + final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); + + final JavaRDD inputRdd = spark + .read() + .load(eventsPath) + .as(Encoders.bean(Event.class)) + .map(IndexOnESJob::eventAsJsonString, Encoders.STRING()) + .javaRDD(); + + final Map esCfg = new HashMap<>(); + // esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54"); + esCfg.put("es.nodes", indexHost); + esCfg.put("es.mapping.id", "eventId"); // THE PRIMARY KEY + esCfg.put("es.batch.write.retry.count", "8"); + esCfg.put("es.batch.write.retry.wait", "60s"); + esCfg.put("es.batch.size.entries", "200"); + esCfg.put("es.nodes.wan.only", "true"); + + JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); + } + + private static String eventAsJsonString(final Event f) throws JsonProcessingException { + return new ObjectMapper().writeValueAsString(f); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java index 7f82f9a2b..0618ff7e3 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java @@ -12,6 +12,7 @@ import java.util.stream.Collectors; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.spark.util.LongAccumulator; import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.broker.model.Topic; @@ -36,7 +37,8 @@ public abstract class UpdateMatcher { public Collection> searchUpdatesForRecord(final OaBrokerMainEntity res, final Collection others, - final DedupConfig dedupConfig) { + final DedupConfig dedupConfig, + final Map accumulators) { final Map> infoMap = new HashMap<>(); @@ -67,9 +69,10 @@ public abstract class UpdateMatcher { if (values.isEmpty()) { return new ArrayList<>(); } else if (values.size() > maxNumber) { - System.err.println("Too many events (" + values.size() + ") matched by " + getClass().getSimpleName()); + incrementAccumulator(accumulators, maxNumber); return values.subList(0, maxNumber); } else { + incrementAccumulator(accumulators, values.size()); return values; } } @@ -100,4 +103,14 @@ public abstract class UpdateMatcher { return highlightToStringFunction; } + public String accumulatorName() { + return "event_matcher_" + getClass().getSimpleName().toLowerCase(); + } + + public void incrementAccumulator(final Map accumulators, final long n) { + if (accumulators.containsKey(accumulatorName())) { + accumulators.get(accumulatorName()).add(n); + } + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java index 6dfca4fcb..5ed55247b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java @@ -3,6 +3,9 @@ package eu.dnetlib.dhp.broker.oa.util; import java.util.ArrayList; import java.util.List; +import java.util.Map; + +import org.apache.spark.util.LongAccumulator; import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.broker.model.EventFactory; @@ -35,7 +38,7 @@ import eu.dnetlib.pace.config.DedupConfig; public class EventFinder { - private static List> matchers = new ArrayList<>(); + private static final List> matchers = new ArrayList<>(); static { matchers.add(new EnrichMissingAbstract()); matchers.add(new EnrichMissingAuthorOrcid()); @@ -47,7 +50,7 @@ public class EventFinder { matchers.add(new EnrichMorePid()); matchers.add(new EnrichMoreSubject()); - // // Advanced matchers + // Advanced matchers matchers.add(new EnrichMissingProject()); matchers.add(new EnrichMoreProject()); matchers.add(new EnrichMissingSoftware()); @@ -65,12 +68,14 @@ public class EventFinder { matchers.add(new EnrichMissingAbstract()); } - public static EventGroup generateEvents(final ResultGroup results, final DedupConfig dedupConfig) { + public static EventGroup generateEvents(final ResultGroup results, + final DedupConfig dedupConfig, + final Map accumulators) { final List> list = new ArrayList<>(); for (final OaBrokerMainEntity target : results.getData()) { for (final UpdateMatcher matcher : matchers) { - list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig)); + list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig, accumulators)); } } @@ -83,4 +88,8 @@ public class EventFinder { return events; } + public static List> getMatchers() { + return matchers; + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 8752200ff..b8d12c42c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -17,7 +17,14 @@ dedupConfProfId the id of a valid Dedup Configuration Profile - + + esIndexName + the elasticsearch index name + + + esIndexHost + the elasticsearch host + sparkDriverMemory memory for driver process @@ -359,6 +366,31 @@ --isLookupUrl${isLookupUrl} --dedupConfProfile${dedupConfProfId} + + + + + + + yarn + cluster + IndexOnESJob + eu.dnetlib.dhp.broker.oa.IndexOnESJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --workingPath${workingPath} + --index${esIndexName} + --esHost${esIndexHost} + diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json new file mode 100644 index 000000000..ac1dbf786 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "o", + "paramLongName": "workingPath", + "paramDescription": "the workinh path", + "paramRequired": true + }, + { + "paramName": "idx", + "paramLongName": "index", + "paramDescription": "the ES index", + "paramRequired": true + }, + { + "paramName": "es", + "paramLongName": "esHost", + "paramDescription": "the ES host", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index fd68bfec2..f10c5d804 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + @@ -80,7 +80,7 @@ - + yarn cluster @@ -101,6 +101,31 @@ --isLookupUrl${isLookupUrl} --dedupConfProfile${dedupConfProfId} + + + + + + + yarn + cluster + IndexOnESJob + eu.dnetlib.dhp.broker.oa.IndexOnESJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --workingPath${workingPath} + --index${esIndexName} + --esHost${esIndexHost} +