From 6f136734641b167fbbf9325255e1b69bdea8ef7d Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 29 Jun 2020 16:33:32 +0200 Subject: [PATCH 1/3] accumulators --- dhp-workflows/dhp-broker-events/pom.xml | 10 ++- .../dhp/broker/oa/GenerateEventsJob.java | 21 +++++- .../dnetlib/dhp/broker/oa/IndexOnESJob.java | 71 +++++++++++++++++++ .../dhp/broker/oa/matchers/UpdateMatcher.java | 17 ++++- .../dhp/broker/oa/util/EventFinder.java | 17 +++-- .../oa/generate_all/oozie_app/workflow.xml | 34 ++++++++- .../eu/dnetlib/dhp/broker/oa/index_es.json | 20 ++++++ .../broker/oa/partial/oozie_app/workflow.xml | 29 +++++++- 8 files changed, 207 insertions(+), 12 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml index 424015a3c..e3182c259 100644 --- a/dhp-workflows/dhp-broker-events/pom.xml +++ b/dhp-workflows/dhp-broker-events/pom.xml @@ -1,5 +1,7 @@ - + dhp-workflows eu.dnetlib.dhp @@ -24,7 +26,11 @@ org.apache.spark spark-sql_2.11 - + + org.elasticsearch + elasticsearch-hadoop + + eu.dnetlib.dhp diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java index dbe2fdd47..30e77be50 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java @@ -3,14 +3,18 @@ package eu.dnetlib.dhp.broker.oa; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.EventFinder; import eu.dnetlib.dhp.broker.oa.util.EventGroup; @@ -66,12 +71,15 @@ public class GenerateEventsJob { ClusterUtils.removeDir(spark, eventsPath); + final Map accumulators = prepareAccumulators(spark.sparkContext()); + final Dataset groups = ClusterUtils .readPath(spark, workingPath + "/duplicates", ResultGroup.class); final Dataset events = groups .map( - (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), + (MapFunction) g -> EventFinder + .generateEvents(g, dedupConfig, accumulators), Encoders.bean(EventGroup.class)) .flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class)); @@ -81,6 +89,17 @@ public class GenerateEventsJob { } + public static Map prepareAccumulators(final SparkContext sc) { + + return EventFinder + .getMatchers() + .stream() + .map(UpdateMatcher::accumulatorName) + .distinct() + .collect(Collectors.toMap(s -> s, s -> sc.longAccumulator(s))); + + } + private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception { final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java new file mode 100644 index 000000000..be8d14c5f --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java @@ -0,0 +1,71 @@ + +package eu.dnetlib.dhp.broker.oa; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.model.Event; + +public class IndexOnESJob { + + private static final Logger log = LoggerFactory.getLogger(IndexOnESJob.class); + + public static void main(final String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + IndexOnESJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_es.json"))); + parser.parseArgument(args); + + final SparkConf conf = new SparkConf(); + + final String eventsPath = parser.get("workingPath") + "/events"; + log.info("eventsPath: {}", eventsPath); + + final String index = parser.get("index"); + log.info("index: {}", index); + + final String indexHost = parser.get("esHost"); + log.info("indexHost: {}", indexHost); + + final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); + + final JavaRDD inputRdd = spark + .read() + .load(eventsPath) + .as(Encoders.bean(Event.class)) + .map(IndexOnESJob::eventAsJsonString, Encoders.STRING()) + .javaRDD(); + + final Map esCfg = new HashMap<>(); + // esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54"); + esCfg.put("es.nodes", indexHost); + esCfg.put("es.mapping.id", "eventId"); // THE PRIMARY KEY + esCfg.put("es.batch.write.retry.count", "8"); + esCfg.put("es.batch.write.retry.wait", "60s"); + esCfg.put("es.batch.size.entries", "200"); + esCfg.put("es.nodes.wan.only", "true"); + + JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); + } + + private static String eventAsJsonString(final Event f) throws JsonProcessingException { + return new ObjectMapper().writeValueAsString(f); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java index 7f82f9a2b..0618ff7e3 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java @@ -12,6 +12,7 @@ import java.util.stream.Collectors; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.spark.util.LongAccumulator; import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.broker.model.Topic; @@ -36,7 +37,8 @@ public abstract class UpdateMatcher { public Collection> searchUpdatesForRecord(final OaBrokerMainEntity res, final Collection others, - final DedupConfig dedupConfig) { + final DedupConfig dedupConfig, + final Map accumulators) { final Map> infoMap = new HashMap<>(); @@ -67,9 +69,10 @@ public abstract class UpdateMatcher { if (values.isEmpty()) { return new ArrayList<>(); } else if (values.size() > maxNumber) { - System.err.println("Too many events (" + values.size() + ") matched by " + getClass().getSimpleName()); + incrementAccumulator(accumulators, maxNumber); return values.subList(0, maxNumber); } else { + incrementAccumulator(accumulators, values.size()); return values; } } @@ -100,4 +103,14 @@ public abstract class UpdateMatcher { return highlightToStringFunction; } + public String accumulatorName() { + return "event_matcher_" + getClass().getSimpleName().toLowerCase(); + } + + public void incrementAccumulator(final Map accumulators, final long n) { + if (accumulators.containsKey(accumulatorName())) { + accumulators.get(accumulatorName()).add(n); + } + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java index 6dfca4fcb..5ed55247b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java @@ -3,6 +3,9 @@ package eu.dnetlib.dhp.broker.oa.util; import java.util.ArrayList; import java.util.List; +import java.util.Map; + +import org.apache.spark.util.LongAccumulator; import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.broker.model.EventFactory; @@ -35,7 +38,7 @@ import eu.dnetlib.pace.config.DedupConfig; public class EventFinder { - private static List> matchers = new ArrayList<>(); + private static final List> matchers = new ArrayList<>(); static { matchers.add(new EnrichMissingAbstract()); matchers.add(new EnrichMissingAuthorOrcid()); @@ -47,7 +50,7 @@ public class EventFinder { matchers.add(new EnrichMorePid()); matchers.add(new EnrichMoreSubject()); - // // Advanced matchers + // Advanced matchers matchers.add(new EnrichMissingProject()); matchers.add(new EnrichMoreProject()); matchers.add(new EnrichMissingSoftware()); @@ -65,12 +68,14 @@ public class EventFinder { matchers.add(new EnrichMissingAbstract()); } - public static EventGroup generateEvents(final ResultGroup results, final DedupConfig dedupConfig) { + public static EventGroup generateEvents(final ResultGroup results, + final DedupConfig dedupConfig, + final Map accumulators) { final List> list = new ArrayList<>(); for (final OaBrokerMainEntity target : results.getData()) { for (final UpdateMatcher matcher : matchers) { - list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig)); + list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig, accumulators)); } } @@ -83,4 +88,8 @@ public class EventFinder { return events; } + public static List> getMatchers() { + return matchers; + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 8752200ff..b8d12c42c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -17,7 +17,14 @@ dedupConfProfId the id of a valid Dedup Configuration Profile - + + esIndexName + the elasticsearch index name + + + esIndexHost + the elasticsearch host + sparkDriverMemory memory for driver process @@ -359,6 +366,31 @@ --isLookupUrl${isLookupUrl} --dedupConfProfile${dedupConfProfId} + + + + + + + yarn + cluster + IndexOnESJob + eu.dnetlib.dhp.broker.oa.IndexOnESJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --workingPath${workingPath} + --index${esIndexName} + --esHost${esIndexHost} + diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json new file mode 100644 index 000000000..ac1dbf786 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "o", + "paramLongName": "workingPath", + "paramDescription": "the workinh path", + "paramRequired": true + }, + { + "paramName": "idx", + "paramLongName": "index", + "paramDescription": "the ES index", + "paramRequired": true + }, + { + "paramName": "es", + "paramLongName": "esHost", + "paramDescription": "the ES host", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index fd68bfec2..f10c5d804 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + @@ -80,7 +80,7 @@ - + yarn cluster @@ -101,6 +101,31 @@ --isLookupUrl${isLookupUrl} --dedupConfProfile${dedupConfProfId} + + + + + + + yarn + cluster + IndexOnESJob + eu.dnetlib.dhp.broker.oa.IndexOnESJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --workingPath${workingPath} + --index${esIndexName} + --esHost${esIndexHost} + From 59a5421c24976a1d8af397ea77bc72f4ce694a03 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 30 Jun 2020 16:17:09 +0200 Subject: [PATCH 2/3] indexing, accumulators, limited lists --- .../dhp/broker/oa/GenerateEventsJob.java | 16 +-- .../dnetlib/dhp/broker/oa/IndexOnESJob.java | 7 +- .../dnetlib/dhp/broker/oa/JoinStep1Job.java | 13 +- .../dnetlib/dhp/broker/oa/JoinStep2Job.java | 13 +- .../dnetlib/dhp/broker/oa/JoinStep3Job.java | 13 +- .../dnetlib/dhp/broker/oa/JoinStep4Job.java | 13 +- .../dhp/broker/oa/PrepareGroupsJob.java | 14 +- .../broker/oa/PrepareRelatedDatasetsJob.java | 13 +- .../broker/oa/PrepareRelatedProjectsJob.java | 14 +- .../oa/PrepareRelatedPublicationsJob.java | 13 +- .../broker/oa/PrepareRelatedSoftwaresJob.java | 13 +- .../broker/oa/PrepareSimpleEntititiesJob.java | 13 +- .../dhp/broker/oa/matchers/UpdateMatcher.java | 6 +- .../dhp/broker/oa/util/ClusterUtils.java | 18 +++ .../dhp/broker/oa/util/ConversionUtils.java | 46 ++++--- .../oa/generate_all/oozie_app/workflow.xml | 2 +- .../broker/oa/partial/oozie_app/workflow.xml | 30 +---- .../broker/oa/matchers/UpdateMatcherTest.java | 125 ++++++++++++++++++ .../EnrichMissingPublicationDateTest.java | 57 ++++++++ 19 files changed, 315 insertions(+), 124 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java create mode 100644 dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDateTest.java diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java index 30e77be50..d6ac71429 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java @@ -10,10 +10,8 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; -import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,17 +71,17 @@ public class GenerateEventsJob { final Map accumulators = prepareAccumulators(spark.sparkContext()); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_events"); + final Dataset groups = ClusterUtils .readPath(spark, workingPath + "/duplicates", ResultGroup.class); - final Dataset events = groups - .map( - (MapFunction) g -> EventFinder - .generateEvents(g, dedupConfig, accumulators), - Encoders.bean(EventGroup.class)) - .flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class)); + final Dataset dataset = groups + .map(g -> EventFinder.generateEvents(g, dedupConfig, accumulators), Encoders.bean(EventGroup.class)) + .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class)) + .map(e -> ClusterUtils.incrementAccumulator(e, total), Encoders.bean(Event.class)); - events.write().mode(SaveMode.Overwrite).json(eventsPath); + ClusterUtils.save(dataset, eventsPath, Event.class, total); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java index be8d14c5f..36d0ffd1b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java @@ -18,6 +18,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; public class IndexOnESJob { @@ -45,10 +46,8 @@ public class IndexOnESJob { final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); - final JavaRDD inputRdd = spark - .read() - .load(eventsPath) - .as(Encoders.bean(Event.class)) + final JavaRDD inputRdd = ClusterUtils + .readPath(spark, eventsPath, Event.class) .map(IndexOnESJob::eventAsJsonString, Encoders.STRING()) .javaRDD(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java index 1be782a12..f9bf2d146 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java @@ -10,8 +10,8 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.TypedColumn; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +52,8 @@ public class JoinStep1Job { ClusterUtils.removeDir(spark, joinedEntitiesPath); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); + final Dataset sources = ClusterUtils .readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class); @@ -61,16 +63,15 @@ public class JoinStep1Job { final TypedColumn, OaBrokerMainEntity> aggr = new RelatedProjectAggregator() .toColumn(); - sources + final Dataset dataset = sources .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .groupByKey( (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) - .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) - .write() - .mode(SaveMode.Overwrite) - .json(joinedEntitiesPath); + .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); + + ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java index 103d79553..cdcf0add4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java @@ -10,8 +10,8 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.TypedColumn; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +52,8 @@ public class JoinStep2Job { ClusterUtils.removeDir(spark, joinedEntitiesPath); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); + final Dataset sources = ClusterUtils .readPath(spark, workingPath + "/joinedEntities_step1", OaBrokerMainEntity.class); @@ -61,16 +63,15 @@ public class JoinStep2Job { final TypedColumn, OaBrokerMainEntity> aggr = new RelatedSoftwareAggregator() .toColumn(); - sources + final Dataset dataset = sources .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .groupByKey( (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) - .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) - .write() - .mode(SaveMode.Overwrite) - .json(joinedEntitiesPath); + .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); + + ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java index ceb199dc4..4d06f6f13 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java @@ -10,8 +10,8 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.TypedColumn; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +52,8 @@ public class JoinStep3Job { ClusterUtils.removeDir(spark, joinedEntitiesPath); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); + final Dataset sources = ClusterUtils .readPath(spark, workingPath + "/joinedEntities_step2", OaBrokerMainEntity.class); @@ -61,16 +63,15 @@ public class JoinStep3Job { final TypedColumn, OaBrokerMainEntity> aggr = new RelatedDatasetAggregator() .toColumn(); - sources + final Dataset dataset = sources .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .groupByKey( (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) - .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) - .write() - .mode(SaveMode.Overwrite) - .json(joinedEntitiesPath); + .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); + + ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java index 3067810dd..b53d7e39b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java @@ -10,8 +10,8 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.TypedColumn; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +52,8 @@ public class JoinStep4Job { ClusterUtils.removeDir(spark, joinedEntitiesPath); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); + final Dataset sources = ClusterUtils .readPath(spark, workingPath + "/joinedEntities_step3", OaBrokerMainEntity.class); @@ -61,16 +63,15 @@ public class JoinStep4Job { final TypedColumn, OaBrokerMainEntity> aggr = new RelatedPublicationAggregator() .toColumn(); - sources + final Dataset dataset = sources .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .groupByKey( (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) - .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) - .write() - .mode(SaveMode.Overwrite) - .json(joinedEntitiesPath); + .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); + + ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java index 47a9f36c5..eb9add00d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java @@ -10,8 +10,8 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.TypedColumn; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +57,8 @@ public class PrepareGroupsJob { ClusterUtils.removeDir(spark, groupsPath); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_groups"); + final Dataset results = ClusterUtils .readPath(spark, workingPath + "/joinedEntities_step4", OaBrokerMainEntity.class); @@ -67,20 +69,16 @@ public class PrepareGroupsJob { final TypedColumn, ResultGroup> aggr = new ResultAggregator() .toColumn(); - final Dataset groups = results + final Dataset dataset = results .joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner") .groupByKey( (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) .agg(aggr) - .map( - (MapFunction, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class)) + .map(t -> t._2, Encoders.bean(ResultGroup.class)) .filter(rg -> rg.getData().size() > 1); - groups - .write() - .mode(SaveMode.Overwrite) - .json(groupsPath); + ClusterUtils.save(dataset, groupsPath, ResultGroup.class, total); }); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java index 6e006ccf0..0cfc1adcb 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java @@ -9,7 +9,7 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +54,8 @@ public class PrepareRelatedDatasetsJob { ClusterUtils.removeDir(spark, relsPath); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_rels"); + final Dataset datasets = ClusterUtils .readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class) .filter(d -> !ClusterUtils.isDedupRoot(d.getId())) @@ -67,16 +69,15 @@ public class PrepareRelatedDatasetsJob { .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); - rels + final Dataset dataset = rels .joinWith(datasets, datasets.col("openaireId").equalTo(rels.col("target")), "inner") .map(t -> { final RelatedDataset rel = new RelatedDataset(t._1.getSource(), t._2); rel.getRelDataset().setRelType(t._1.getRelClass()); return rel; - }, Encoders.bean(RelatedDataset.class)) - .write() - .mode(SaveMode.Overwrite) - .json(relsPath); + }, Encoders.bean(RelatedDataset.class)); + + ClusterUtils.save(dataset, relsPath, RelatedDataset.class, total); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java index 0af5d21b7..e988366c8 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java @@ -9,7 +9,7 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +56,8 @@ public class PrepareRelatedProjectsJob { ClusterUtils.removeDir(spark, relsPath); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_rels"); + final Dataset projects = ClusterUtils .readPath(spark, graphPath + "/project", Project.class) .filter(p -> !ClusterUtils.isDedupRoot(p.getId())) @@ -69,12 +71,12 @@ public class PrepareRelatedProjectsJob { .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); - rels + final Dataset dataset = rels .joinWith(projects, projects.col("openaireId").equalTo(rels.col("target")), "inner") - .map(t -> new RelatedProject(t._1.getSource(), t._2), Encoders.bean(RelatedProject.class)) - .write() - .mode(SaveMode.Overwrite) - .json(relsPath); + .map(t -> new RelatedProject(t._1.getSource(), t._2), Encoders.bean(RelatedProject.class)); + + ClusterUtils.save(dataset, relsPath, RelatedProject.class, total); + }); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java index 84752776e..724acc4dc 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java @@ -9,7 +9,7 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +55,8 @@ public class PrepareRelatedPublicationsJob { ClusterUtils.removeDir(spark, relsPath); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_rels"); + final Dataset pubs = ClusterUtils .readPath(spark, graphPath + "/publication", Publication.class) .filter(p -> !ClusterUtils.isDedupRoot(p.getId())) @@ -70,16 +72,15 @@ public class PrepareRelatedPublicationsJob { .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); - rels + final Dataset dataset = rels .joinWith(pubs, pubs.col("openaireId").equalTo(rels.col("target")), "inner") .map(t -> { final RelatedPublication rel = new RelatedPublication(t._1.getSource(), t._2); rel.getRelPublication().setRelType(t._1.getRelClass()); return rel; - }, Encoders.bean(RelatedPublication.class)) - .write() - .mode(SaveMode.Overwrite) - .json(relsPath); + }, Encoders.bean(RelatedPublication.class)); + + ClusterUtils.save(dataset, relsPath, RelatedPublication.class, total); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java index 0ad753a97..d15565d0d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java @@ -9,7 +9,7 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +56,8 @@ public class PrepareRelatedSoftwaresJob { ClusterUtils.removeDir(spark, relsPath); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_rels"); + final Dataset softwares = ClusterUtils .readPath(spark, graphPath + "/software", Software.class) .filter(sw -> !ClusterUtils.isDedupRoot(sw.getId())) @@ -69,12 +71,11 @@ public class PrepareRelatedSoftwaresJob { .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); - rels + final Dataset dataset = rels .joinWith(softwares, softwares.col("openaireId").equalTo(rels.col("target")), "inner") - .map(t -> new RelatedSoftware(t._1.getSource(), t._2), Encoders.bean(RelatedSoftware.class)) - .write() - .mode(SaveMode.Overwrite) - .json(relsPath); + .map(t -> new RelatedSoftware(t._1.getSource(), t._2), Encoders.bean(RelatedSoftware.class)); + + ClusterUtils.save(dataset, relsPath, RelatedSoftware.class, total); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java index 1b9c279fd..d3c7113ec 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java @@ -9,8 +9,8 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,13 +56,14 @@ public class PrepareSimpleEntititiesJob { ClusterUtils.removeDir(spark, simpleEntitiesPath); - prepareSimpleEntities(spark, graphPath, Publication.class) + final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); + + final Dataset dataset = prepareSimpleEntities(spark, graphPath, Publication.class) .union(prepareSimpleEntities(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class)) .union(prepareSimpleEntities(spark, graphPath, Software.class)) - .union(prepareSimpleEntities(spark, graphPath, OtherResearchProduct.class)) - .write() - .mode(SaveMode.Overwrite) - .json(simpleEntitiesPath); + .union(prepareSimpleEntities(spark, graphPath, OtherResearchProduct.class)); + + ClusterUtils.save(dataset, simpleEntitiesPath, OaBrokerMainEntity.class, total); }); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java index 0618ff7e3..af6ab30a1 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java @@ -83,8 +83,8 @@ public abstract class UpdateMatcher { return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0)); } - protected boolean isMissing(final String field) { - return StringUtils.isBlank(field); + protected boolean isMissing(final String s) { + return StringUtils.isBlank(s); } public int getMaxNumber() { @@ -108,7 +108,7 @@ public abstract class UpdateMatcher { } public void incrementAccumulator(final Map accumulators, final long n) { - if (accumulators.containsKey(accumulatorName())) { + if (accumulators != null && accumulators.containsKey(accumulatorName())) { accumulators.get(accumulatorName()).add(n); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java index de9b901d0..2d0106a7a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java @@ -4,7 +4,9 @@ package eu.dnetlib.dhp.broker.oa.util; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.LongAccumulator; import com.fasterxml.jackson.databind.ObjectMapper; @@ -44,4 +46,20 @@ public class ClusterUtils { || s.equals("isSupplementedTo"); } + public static T incrementAccumulator(final T o, final LongAccumulator acc) { + if (acc != null) { + acc.add(1); + } + return o; + } + + public static void save(final Dataset dataset, final String path, final Class clazz, + final LongAccumulator acc) { + dataset + .map(o -> ClusterUtils.incrementAccumulator(o, acc), Encoders.bean(clazz)) + .write() + .mode(SaveMode.Overwrite) + .json(path); + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index b61d5e7cc..26a9e9471 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -55,7 +55,7 @@ public class ConversionUtils { res.setLicense(BrokerConstants.OPEN_ACCESS); res.setHostedby(kvValue(i.getHostedby())); return res; - }); + }, 20); } public static OaBrokerTypedValue oafPidToBrokerPid(final StructuredProperty sp) { @@ -75,8 +75,8 @@ public class ConversionUtils { res.setOpenaireId(d.getId()); res.setOriginalId(first(d.getOriginalId())); res.setTitle(structPropValue(d.getTitle())); - res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid)); - res.setInstances(flatMappedList(d.getInstance(), ConversionUtils::oafInstanceToBrokerInstances)); + res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid, 20)); + res.setInstances(flatMappedList(d.getInstance(), ConversionUtils::oafInstanceToBrokerInstances, 20)); res.setCollectedFrom(mappedFirst(d.getCollectedfrom(), KeyValue::getValue)); return res; } @@ -90,8 +90,8 @@ public class ConversionUtils { res.setOpenaireId(p.getId()); res.setOriginalId(first(p.getOriginalId())); res.setTitle(structPropValue(p.getTitle())); - res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid)); - res.setInstances(flatMappedList(p.getInstance(), ConversionUtils::oafInstanceToBrokerInstances)); + res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid, 20)); + res.setInstances(flatMappedList(p.getInstance(), ConversionUtils::oafInstanceToBrokerInstances, 20)); res.setCollectedFrom(mappedFirst(p.getCollectedfrom(), KeyValue::getValue)); return res; @@ -107,23 +107,25 @@ public class ConversionUtils { res.setOpenaireId(result.getId()); res.setOriginalId(first(result.getOriginalId())); res.setTypology(classId(result.getResulttype())); - res.setTitles(structPropList(result.getTitle())); - res.setAbstracts(fieldList(result.getDescription())); + res.setTitles(structPropList(result.getTitle(), 10)); + res.setAbstracts(fieldList(result.getDescription(), 10)); res.setLanguage(classId(result.getLanguage())); res.setSubjects(structPropTypedList(result.getSubject())); - res.setCreators(mappedList(result.getAuthor(), ConversionUtils::oafAuthorToBrokerAuthor)); + res.setCreators(mappedList(result.getAuthor(), ConversionUtils::oafAuthorToBrokerAuthor, 30)); res.setPublicationdate(fieldValue(result.getDateofacceptance())); res.setPublisher(fieldValue(result.getPublisher())); res.setEmbargoenddate(fieldValue(result.getEmbargoenddate())); - res.setContributor(fieldList(result.getContributor())); + res.setContributor(fieldList(result.getContributor(), 20)); res .setJournal( result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null); res.setCollectedFromId(mappedFirst(result.getCollectedfrom(), KeyValue::getKey)); res.setCollectedFromName(mappedFirst(result.getCollectedfrom(), KeyValue::getValue)); - res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid)); - res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances)); - res.setExternalReferences(mappedList(result.getExternalReference(), ConversionUtils::oafExtRefToBrokerExtRef)); + res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid, 20)); + res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances, 20)); + res + .setExternalReferences( + mappedList(result.getExternalReference(), ConversionUtils::oafExtRefToBrokerExtRef, 20)); return res; } @@ -243,18 +245,25 @@ public class ConversionUtils { : null; } - private static List fieldList(final List> fl) { + private static List fieldList(final List> fl, final long maxSize) { return fl != null - ? fl.stream().map(Field::getValue).filter(StringUtils::isNotBlank).collect(Collectors.toList()) + ? fl + .stream() + .map(Field::getValue) + .map(s -> StringUtils.abbreviate(s, 3000)) // MAX 3000 CHARS + .filter(StringUtils::isNotBlank) + .limit(maxSize) + .collect(Collectors.toList()) : new ArrayList<>(); } - private static List structPropList(final List props) { + private static List structPropList(final List props, final long maxSize) { return props != null ? props .stream() .map(StructuredProperty::getValue) .filter(StringUtils::isNotBlank) + .limit(maxSize) .collect(Collectors.toList()) : new ArrayList<>(); } @@ -271,7 +280,7 @@ public class ConversionUtils { .collect(Collectors.toList()); } - private static List mappedList(final List list, final Function func) { + private static List mappedList(final List list, final Function func, final long maxSize) { if (list == null) { return new ArrayList<>(); } @@ -280,10 +289,12 @@ public class ConversionUtils { .stream() .map(func::apply) .filter(Objects::nonNull) + .limit(maxSize) .collect(Collectors.toList()); } - private static List flatMappedList(final List list, final Function> func) { + private static List flatMappedList(final List list, final Function> func, + final long maxSize) { if (list == null) { return new ArrayList<>(); } @@ -293,6 +304,7 @@ public class ConversionUtils { .map(func::apply) .flatMap(List::stream) .filter(Objects::nonNull) + .limit(maxSize) .collect(Collectors.toList()); } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index b8d12c42c..7667bfba7 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -378,9 +378,9 @@ eu.dnetlib.dhp.broker.oa.IndexOnESJob dhp-broker-events-${projectVersion}.jar - --executor-cores=${sparkExecutorCores} --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} + --conf spark.dynamicAllocation.maxExecutors="2" --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index f10c5d804..9128c9820 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -78,9 +78,8 @@ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - + + yarn cluster @@ -101,31 +100,6 @@ --isLookupUrl${isLookupUrl} --dedupConfProfile${dedupConfProfId} - - - - - - - yarn - cluster - IndexOnESJob - eu.dnetlib.dhp.broker.oa.IndexOnESJob - dhp-broker-events-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - --workingPath${workingPath} - --index${esIndexName} - --esHost${esIndexHost} - diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java new file mode 100644 index 000000000..93bc5617f --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java @@ -0,0 +1,125 @@ + +package eu.dnetlib.dhp.broker.oa.matchers; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Arrays; +import java.util.Collection; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate; +import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; + +class UpdateMatcherTest { + + UpdateMatcher matcher = new EnrichMissingPublicationDate(); + + @BeforeEach + void setUp() throws Exception { + } + + @Test + void testSearchUpdatesForRecord_1() { + final OaBrokerMainEntity res = new OaBrokerMainEntity(); + final OaBrokerMainEntity p1 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p2 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p3 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p4 = new OaBrokerMainEntity(); + + final Collection> list = matcher + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + + assertTrue(list.isEmpty()); + } + + @Test + void testSearchUpdatesForRecord_2() { + final OaBrokerMainEntity res = new OaBrokerMainEntity(); + final OaBrokerMainEntity p1 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p2 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p3 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p4 = new OaBrokerMainEntity(); + + res.setPublicationdate("2018"); + + final Collection> list = matcher + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + + assertTrue(list.isEmpty()); + } + + @Test + void testSearchUpdatesForRecord_3() { + final OaBrokerMainEntity res = new OaBrokerMainEntity(); + final OaBrokerMainEntity p1 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p2 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p3 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p4 = new OaBrokerMainEntity(); + + p2.setPublicationdate("2018"); + + final Collection> list = matcher + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + + assertTrue(list.size() == 1); + } + + @Test + void testSearchUpdatesForRecord_4() { + final OaBrokerMainEntity res = new OaBrokerMainEntity(); + final OaBrokerMainEntity p1 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p2 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p3 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p4 = new OaBrokerMainEntity(); + + res.setPublicationdate("2018"); + p2.setPublicationdate("2018"); + + final Collection> list = matcher + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + + assertTrue(list.isEmpty()); + } + + @Test + void testSearchUpdatesForRecord_5() { + final OaBrokerMainEntity res = new OaBrokerMainEntity(); + final OaBrokerMainEntity p1 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p2 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p3 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p4 = new OaBrokerMainEntity(); + res.setPublicationdate("2018"); + p1.setPublicationdate("2018"); + p2.setPublicationdate("2018"); + p3.setPublicationdate("2018"); + p4.setPublicationdate("2018"); + + final Collection> list = matcher + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + + assertTrue(list.isEmpty()); + } + + @Test + void testSearchUpdatesForRecord_6() { + final OaBrokerMainEntity res = new OaBrokerMainEntity(); + final OaBrokerMainEntity p1 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p2 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p3 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p4 = new OaBrokerMainEntity(); + + p1.setPublicationdate("2018"); + p2.setPublicationdate("2018"); + p3.setPublicationdate("2018"); + p4.setPublicationdate("2018"); + + final Collection> list = matcher + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + + assertTrue(list.size() == 1); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDateTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDateTest.java new file mode 100644 index 000000000..77a19af4c --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDateTest.java @@ -0,0 +1,57 @@ + +package eu.dnetlib.dhp.broker.oa.matchers.simple; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; + +class EnrichMissingPublicationDateTest { + + final EnrichMissingPublicationDate matcher = new EnrichMissingPublicationDate(); + + @BeforeEach + void setUp() throws Exception { + } + + @Test + void testFindDifferences_1() { + final OaBrokerMainEntity source = new OaBrokerMainEntity(); + final OaBrokerMainEntity target = new OaBrokerMainEntity(); + final List list = matcher.findDifferences(source, target); + assertTrue(list.isEmpty()); + } + + @Test + void testFindDifferences_2() { + final OaBrokerMainEntity source = new OaBrokerMainEntity(); + final OaBrokerMainEntity target = new OaBrokerMainEntity(); + source.setPublicationdate("2018"); + final List list = matcher.findDifferences(source, target); + assertTrue(list.size() == 1); + } + + @Test + void testFindDifferences_3() { + final OaBrokerMainEntity source = new OaBrokerMainEntity(); + final OaBrokerMainEntity target = new OaBrokerMainEntity(); + target.setPublicationdate("2018"); + final List list = matcher.findDifferences(source, target); + assertTrue(list.isEmpty()); + } + + @Test + void testFindDifferences_4() { + final OaBrokerMainEntity source = new OaBrokerMainEntity(); + final OaBrokerMainEntity target = new OaBrokerMainEntity(); + source.setPublicationdate("2018"); + target.setPublicationdate("2018"); + final List list = matcher.findDifferences(source, target); + assertTrue(list.isEmpty()); + } + +} From 3bcdfbabe9dbce86785e12c1052154c015f9022e Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Wed, 1 Jul 2020 08:42:39 +0200 Subject: [PATCH 3/3] list with limits --- .../AbstractEnrichMissingDataset.java | 6 +++ .../relatedProjects/EnrichMoreProject.java | 6 +++ .../AbstractEnrichMissingPublication.java | 6 +++ .../relatedSoftware/EnrichMoreSoftware.java | 6 +++ .../simple/EnrichMissingAuthorOrcid.java | 6 +++ .../simple/EnrichMissingOpenAccess.java | 6 +++ .../oa/matchers/simple/EnrichMissingPid.java | 3 +- .../matchers/simple/EnrichMissingSubject.java | 7 +++ .../matchers/simple/EnrichMoreOpenAccess.java | 6 +++ .../oa/matchers/simple/EnrichMorePid.java | 7 +++ .../oa/matchers/simple/EnrichMoreSubject.java | 6 +++ .../dhp/broker/oa/util/BrokerConstants.java | 4 ++ .../dhp/broker/oa/util/ConversionUtils.java | 44 +++++++++---------- 13 files changed, 88 insertions(+), 25 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java index f21c1c7b3..2f73a2448 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedDatasets/AbstractEnrichMissingDataset.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -9,6 +10,7 @@ import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerRelatedDataset; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; +import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; public abstract class AbstractEnrichMissingDataset extends UpdateMatcher { @@ -25,6 +27,10 @@ public abstract class AbstractEnrichMissingDataset extends UpdateMatcher findDifferences(final OaBrokerMainEntity source, final OaBrokerMainEntity target) { + if (target.getDatasets().size() >= BrokerConstants.MAX_LIST_SIZE) { + return new ArrayList<>(); + } + final Set existingDatasets = target .getDatasets() .stream() diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java index 85b2cbe28..6a10f19be 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -9,6 +10,7 @@ import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerProject; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; +import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; public class EnrichMoreProject extends UpdateMatcher { @@ -27,6 +29,10 @@ public class EnrichMoreProject extends UpdateMatcher { protected List findDifferences(final OaBrokerMainEntity source, final OaBrokerMainEntity target) { + if (target.getProjects().size() >= BrokerConstants.MAX_LIST_SIZE) { + return new ArrayList<>(); + } + final Set existingProjects = target .getProjects() .stream() diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java index f951131b1..7ba3e5e02 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedPublications; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -9,6 +10,7 @@ import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerRelatedPublication; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; +import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; public abstract class AbstractEnrichMissingPublication extends UpdateMatcher { @@ -27,6 +29,10 @@ public abstract class AbstractEnrichMissingPublication extends UpdateMatcher= BrokerConstants.MAX_LIST_SIZE) { + return new ArrayList<>(); + } + final Set existingPublications = target .getPublications() .stream() diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java index 2bc370187..a6cd34359 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -9,6 +10,7 @@ import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; +import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; public class EnrichMoreSoftware extends UpdateMatcher { @@ -24,6 +26,10 @@ public class EnrichMoreSoftware extends UpdateMatcher { final OaBrokerMainEntity source, final OaBrokerMainEntity target) { + if (target.getSoftwares().size() >= BrokerConstants.MAX_LIST_SIZE) { + return new ArrayList<>(); + } + final Set existingSoftwares = source .getSoftwares() .stream() diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java index 7bbc43fe3..e834d1dde 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.broker.oa.matchers.simple; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -11,6 +12,7 @@ import eu.dnetlib.broker.objects.OaBrokerAuthor; import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; +import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; public class EnrichMissingAuthorOrcid extends UpdateMatcher { @@ -25,6 +27,10 @@ public class EnrichMissingAuthorOrcid extends UpdateMatcher { protected List findDifferences(final OaBrokerMainEntity source, final OaBrokerMainEntity target) { + if (target.getCreators().size() >= BrokerConstants.MAX_LIST_SIZE) { + return new ArrayList<>(); + } + final Set existingOrcids = target .getCreators() .stream() diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java index 41a00dcd1..8e4f2fcf4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.broker.oa.matchers.simple; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -23,6 +24,11 @@ public class EnrichMissingOpenAccess extends UpdateMatcher { @Override protected List findDifferences(final OaBrokerMainEntity source, final OaBrokerMainEntity target) { + + if (target.getInstances().size() >= BrokerConstants.MAX_LIST_SIZE) { + return new ArrayList<>(); + } + final long count = target .getInstances() .stream() diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java index 4863bdeb7..4e4003890 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPid.java @@ -22,9 +22,8 @@ public class EnrichMissingPid extends UpdateMatcher { @Override protected List findDifferences(final OaBrokerMainEntity source, final OaBrokerMainEntity target) { - final long count = target.getPids().size(); - if (count > 0) { + if (target.getPids().size() > 0) { return Arrays.asList(); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java index f762e3f52..26ebbb7c0 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSubject.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.broker.oa.matchers.simple; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -9,6 +10,7 @@ import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerTypedValue; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; +import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; public class EnrichMissingSubject extends UpdateMatcher { @@ -22,6 +24,11 @@ public class EnrichMissingSubject extends UpdateMatcher { @Override protected List findDifferences(final OaBrokerMainEntity source, final OaBrokerMainEntity target) { + + if (target.getSubjects().size() >= BrokerConstants.MAX_LIST_SIZE) { + return new ArrayList<>(); + } + final Set existingSubject = target .getSubjects() .stream() diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java index 9ce362a97..46f6fa80c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.broker.oa.matchers.simple; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -23,6 +24,11 @@ public class EnrichMoreOpenAccess extends UpdateMatcher { @Override protected List findDifferences(final OaBrokerMainEntity source, final OaBrokerMainEntity target) { + + if (target.getInstances().size() >= BrokerConstants.MAX_LIST_SIZE) { + return new ArrayList<>(); + } + final Set urls = target .getInstances() .stream() diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java index 583960037..609437b9d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMorePid.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.broker.oa.matchers.simple; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -9,6 +10,7 @@ import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerTypedValue; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; +import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; public class EnrichMorePid extends UpdateMatcher { @@ -22,6 +24,11 @@ public class EnrichMorePid extends UpdateMatcher { @Override protected List findDifferences(final OaBrokerMainEntity source, final OaBrokerMainEntity target) { + + if (target.getPids().size() >= BrokerConstants.MAX_LIST_SIZE) { + return new ArrayList<>(); + } + final Set existingPids = target .getPids() .stream() diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java index 150029462..bbe6609d7 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSubject.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.broker.oa.matchers.simple; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -9,6 +10,7 @@ import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerTypedValue; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; +import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; public class EnrichMoreSubject extends UpdateMatcher { @@ -23,6 +25,10 @@ public class EnrichMoreSubject extends UpdateMatcher { protected List findDifferences(final OaBrokerMainEntity source, final OaBrokerMainEntity target) { + if (target.getSubjects().size() >= BrokerConstants.MAX_LIST_SIZE) { + return new ArrayList<>(); + } + final Set existingSubjects = target .getSubjects() .stream() diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java index 58e41acbb..5308b9dff 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java @@ -19,6 +19,10 @@ public class BrokerConstants { public static final int MAX_NUMBER_OF_RELS = 20; + public static final int MAX_STRING_SIZE = 3000; + + public static final int MAX_LIST_SIZE = 50; + public static Class[] getModelClasses() { final Set> list = new HashSet<>(); list.addAll(Arrays.asList(ModelSupport.getOafModelClasses())); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index 26a9e9471..1ce84283a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -55,7 +55,7 @@ public class ConversionUtils { res.setLicense(BrokerConstants.OPEN_ACCESS); res.setHostedby(kvValue(i.getHostedby())); return res; - }, 20); + }); } public static OaBrokerTypedValue oafPidToBrokerPid(final StructuredProperty sp) { @@ -75,8 +75,8 @@ public class ConversionUtils { res.setOpenaireId(d.getId()); res.setOriginalId(first(d.getOriginalId())); res.setTitle(structPropValue(d.getTitle())); - res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid, 20)); - res.setInstances(flatMappedList(d.getInstance(), ConversionUtils::oafInstanceToBrokerInstances, 20)); + res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid)); + res.setInstances(flatMappedList(d.getInstance(), ConversionUtils::oafInstanceToBrokerInstances)); res.setCollectedFrom(mappedFirst(d.getCollectedfrom(), KeyValue::getValue)); return res; } @@ -90,8 +90,8 @@ public class ConversionUtils { res.setOpenaireId(p.getId()); res.setOriginalId(first(p.getOriginalId())); res.setTitle(structPropValue(p.getTitle())); - res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid, 20)); - res.setInstances(flatMappedList(p.getInstance(), ConversionUtils::oafInstanceToBrokerInstances, 20)); + res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid)); + res.setInstances(flatMappedList(p.getInstance(), ConversionUtils::oafInstanceToBrokerInstances)); res.setCollectedFrom(mappedFirst(p.getCollectedfrom(), KeyValue::getValue)); return res; @@ -107,25 +107,24 @@ public class ConversionUtils { res.setOpenaireId(result.getId()); res.setOriginalId(first(result.getOriginalId())); res.setTypology(classId(result.getResulttype())); - res.setTitles(structPropList(result.getTitle(), 10)); - res.setAbstracts(fieldList(result.getDescription(), 10)); + res.setTitles(structPropList(result.getTitle())); + res.setAbstracts(fieldList(result.getDescription())); res.setLanguage(classId(result.getLanguage())); res.setSubjects(structPropTypedList(result.getSubject())); - res.setCreators(mappedList(result.getAuthor(), ConversionUtils::oafAuthorToBrokerAuthor, 30)); + res.setCreators(mappedList(result.getAuthor(), ConversionUtils::oafAuthorToBrokerAuthor)); res.setPublicationdate(fieldValue(result.getDateofacceptance())); res.setPublisher(fieldValue(result.getPublisher())); res.setEmbargoenddate(fieldValue(result.getEmbargoenddate())); - res.setContributor(fieldList(result.getContributor(), 20)); + res.setContributor(fieldList(result.getContributor())); res .setJournal( result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null); res.setCollectedFromId(mappedFirst(result.getCollectedfrom(), KeyValue::getKey)); res.setCollectedFromName(mappedFirst(result.getCollectedfrom(), KeyValue::getValue)); - res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid, 20)); - res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances, 20)); + res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid)); + res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances)); res - .setExternalReferences( - mappedList(result.getExternalReference(), ConversionUtils::oafExtRefToBrokerExtRef, 20)); + .setExternalReferences(mappedList(result.getExternalReference(), ConversionUtils::oafExtRefToBrokerExtRef)); return res; } @@ -245,25 +244,25 @@ public class ConversionUtils { : null; } - private static List fieldList(final List> fl, final long maxSize) { + private static List fieldList(final List> fl) { return fl != null ? fl .stream() .map(Field::getValue) - .map(s -> StringUtils.abbreviate(s, 3000)) // MAX 3000 CHARS + .map(s -> StringUtils.abbreviate(s, BrokerConstants.MAX_STRING_SIZE)) .filter(StringUtils::isNotBlank) - .limit(maxSize) + .limit(BrokerConstants.MAX_LIST_SIZE) .collect(Collectors.toList()) : new ArrayList<>(); } - private static List structPropList(final List props, final long maxSize) { + private static List structPropList(final List props) { return props != null ? props .stream() .map(StructuredProperty::getValue) .filter(StringUtils::isNotBlank) - .limit(maxSize) + .limit(BrokerConstants.MAX_LIST_SIZE) .collect(Collectors.toList()) : new ArrayList<>(); } @@ -280,7 +279,7 @@ public class ConversionUtils { .collect(Collectors.toList()); } - private static List mappedList(final List list, final Function func, final long maxSize) { + private static List mappedList(final List list, final Function func) { if (list == null) { return new ArrayList<>(); } @@ -289,12 +288,11 @@ public class ConversionUtils { .stream() .map(func::apply) .filter(Objects::nonNull) - .limit(maxSize) + .limit(BrokerConstants.MAX_LIST_SIZE) .collect(Collectors.toList()); } - private static List flatMappedList(final List list, final Function> func, - final long maxSize) { + private static List flatMappedList(final List list, final Function> func) { if (list == null) { return new ArrayList<>(); } @@ -304,7 +302,7 @@ public class ConversionUtils { .map(func::apply) .flatMap(List::stream) .filter(Objects::nonNull) - .limit(maxSize) + .limit(BrokerConstants.MAX_LIST_SIZE) .collect(Collectors.toList()); }