accumulators

This commit is contained in:
Michele Artini 2020-06-29 16:33:32 +02:00
parent a6ea432435
commit 6f13673464
8 changed files with 207 additions and 12 deletions

View File

@ -1,5 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId>
@ -24,7 +26,11 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>

View File

@ -3,14 +3,18 @@ package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -18,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.EventFinder;
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
@ -66,12 +71,15 @@ public class GenerateEventsJob {
ClusterUtils.removeDir(spark, eventsPath);
final Map<String, LongAccumulator> accumulators = prepareAccumulators(spark.sparkContext());
final Dataset<ResultGroup> groups = ClusterUtils
.readPath(spark, workingPath + "/duplicates", ResultGroup.class);
final Dataset<Event> events = groups
.map(
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig),
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder
.generateEvents(g, dedupConfig, accumulators),
Encoders.bean(EventGroup.class))
.flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class));
@ -81,6 +89,17 @@ public class GenerateEventsJob {
}
public static Map<String, LongAccumulator> prepareAccumulators(final SparkContext sc) {
return EventFinder
.getMatchers()
.stream()
.map(UpdateMatcher::accumulatorName)
.distinct()
.collect(Collectors.toMap(s -> s, s -> sc.longAccumulator(s)));
}
private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);

View File

@ -0,0 +1,71 @@
package eu.dnetlib.dhp.broker.oa;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
public class IndexOnESJob {
private static final Logger log = LoggerFactory.getLogger(IndexOnESJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
IndexOnESJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_es.json")));
parser.parseArgument(args);
final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events";
log.info("eventsPath: {}", eventsPath);
final String index = parser.get("index");
log.info("index: {}", index);
final String indexHost = parser.get("esHost");
log.info("indexHost: {}", indexHost);
final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
final JavaRDD<String> inputRdd = spark
.read()
.load(eventsPath)
.as(Encoders.bean(Event.class))
.map(IndexOnESJob::eventAsJsonString, Encoders.STRING())
.javaRDD();
final Map<String, String> esCfg = new HashMap<>();
// esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54");
esCfg.put("es.nodes", indexHost);
esCfg.put("es.mapping.id", "eventId"); // THE PRIMARY KEY
esCfg.put("es.batch.write.retry.count", "8");
esCfg.put("es.batch.write.retry.wait", "60s");
esCfg.put("es.batch.size.entries", "200");
esCfg.put("es.nodes.wan.only", "true");
JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg);
}
private static String eventAsJsonString(final Event f) throws JsonProcessingException {
return new ObjectMapper().writeValueAsString(f);
}
}

View File

@ -12,6 +12,7 @@ import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.util.LongAccumulator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.Topic;
@ -36,7 +37,8 @@ public abstract class UpdateMatcher<T> {
public Collection<UpdateInfo<T>> searchUpdatesForRecord(final OaBrokerMainEntity res,
final Collection<OaBrokerMainEntity> others,
final DedupConfig dedupConfig) {
final DedupConfig dedupConfig,
final Map<String, LongAccumulator> accumulators) {
final Map<String, UpdateInfo<T>> infoMap = new HashMap<>();
@ -67,9 +69,10 @@ public abstract class UpdateMatcher<T> {
if (values.isEmpty()) {
return new ArrayList<>();
} else if (values.size() > maxNumber) {
System.err.println("Too many events (" + values.size() + ") matched by " + getClass().getSimpleName());
incrementAccumulator(accumulators, maxNumber);
return values.subList(0, maxNumber);
} else {
incrementAccumulator(accumulators, values.size());
return values;
}
}
@ -100,4 +103,14 @@ public abstract class UpdateMatcher<T> {
return highlightToStringFunction;
}
public String accumulatorName() {
return "event_matcher_" + getClass().getSimpleName().toLowerCase();
}
public void incrementAccumulator(final Map<String, LongAccumulator> accumulators, final long n) {
if (accumulators.containsKey(accumulatorName())) {
accumulators.get(accumulatorName()).add(n);
}
}
}

View File

@ -3,6 +3,9 @@ package eu.dnetlib.dhp.broker.oa.util;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.spark.util.LongAccumulator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.EventFactory;
@ -35,7 +38,7 @@ import eu.dnetlib.pace.config.DedupConfig;
public class EventFinder {
private static List<UpdateMatcher<?>> matchers = new ArrayList<>();
private static final List<UpdateMatcher<?>> matchers = new ArrayList<>();
static {
matchers.add(new EnrichMissingAbstract());
matchers.add(new EnrichMissingAuthorOrcid());
@ -47,7 +50,7 @@ public class EventFinder {
matchers.add(new EnrichMorePid());
matchers.add(new EnrichMoreSubject());
// // Advanced matchers
// Advanced matchers
matchers.add(new EnrichMissingProject());
matchers.add(new EnrichMoreProject());
matchers.add(new EnrichMissingSoftware());
@ -65,12 +68,14 @@ public class EventFinder {
matchers.add(new EnrichMissingAbstract());
}
public static EventGroup generateEvents(final ResultGroup results, final DedupConfig dedupConfig) {
public static EventGroup generateEvents(final ResultGroup results,
final DedupConfig dedupConfig,
final Map<String, LongAccumulator> accumulators) {
final List<UpdateInfo<?>> list = new ArrayList<>();
for (final OaBrokerMainEntity target : results.getData()) {
for (final UpdateMatcher<?> matcher : matchers) {
list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig));
list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig, accumulators));
}
}
@ -83,4 +88,8 @@ public class EventFinder {
return events;
}
public static List<UpdateMatcher<?>> getMatchers() {
return matchers;
}
}

View File

@ -17,7 +17,14 @@
<name>dedupConfProfId</name>
<description>the id of a valid Dedup Configuration Profile</description>
</property>
<property>
<name>esIndexName</name>
<description>the elasticsearch index name</description>
</property>
<property>
<name>esIndexHost</name>
<description>the elasticsearch host</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
@ -359,6 +366,31 @@
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
</spark>
<ok to="index_es"/>
<error to="Kill"/>
</action>
<action name="index_es">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>IndexOnESJob</name>
<class>eu.dnetlib.dhp.broker.oa.IndexOnESJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--index</arg><arg>${esIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>

View File

@ -0,0 +1,20 @@
[
{
"paramName": "o",
"paramLongName": "workingPath",
"paramDescription": "the workinh path",
"paramRequired": true
},
{
"paramName": "idx",
"paramLongName": "index",
"paramDescription": "the ES index",
"paramRequired": true
},
{
"paramName": "es",
"paramLongName": "esHost",
"paramDescription": "the ES host",
"paramRequired": true
}
]

View File

@ -1,4 +1,4 @@
<workflow-app name="create broker events" xmlns="uri:oozie:workflow:0.5">
<workflow-app name="create broker events - partial" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
@ -80,7 +80,7 @@
</kill>
<action name="generate_events">
<action name="generate_events">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
@ -101,6 +101,31 @@
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
</spark>
<ok to="index_es"/>
<error to="Kill"/>
</action>
<action name="index_es">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>IndexOnESJob</name>
<class>eu.dnetlib.dhp.broker.oa.IndexOnESJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--index</arg><arg>${esIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>