diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java index 30e77be50..d6ac71429 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java @@ -10,10 +10,8 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; -import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,17 +71,17 @@ public class GenerateEventsJob { final Map accumulators = prepareAccumulators(spark.sparkContext()); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_events"); + final Dataset groups = ClusterUtils .readPath(spark, workingPath + "/duplicates", ResultGroup.class); - final Dataset events = groups - .map( - (MapFunction) g -> EventFinder - .generateEvents(g, dedupConfig, accumulators), - Encoders.bean(EventGroup.class)) - .flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class)); + final Dataset dataset = groups + .map(g -> EventFinder.generateEvents(g, dedupConfig, accumulators), Encoders.bean(EventGroup.class)) + .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class)) + .map(e -> ClusterUtils.incrementAccumulator(e, total), Encoders.bean(Event.class)); - events.write().mode(SaveMode.Overwrite).json(eventsPath); + ClusterUtils.save(dataset, eventsPath, Event.class, total); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java index be8d14c5f..36d0ffd1b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java @@ -18,6 +18,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; public class IndexOnESJob { @@ -45,10 +46,8 @@ public class IndexOnESJob { final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); - final JavaRDD inputRdd = spark - .read() - .load(eventsPath) - .as(Encoders.bean(Event.class)) + final JavaRDD inputRdd = ClusterUtils + .readPath(spark, eventsPath, Event.class) .map(IndexOnESJob::eventAsJsonString, Encoders.STRING()) .javaRDD(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java index 1be782a12..f9bf2d146 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java @@ -10,8 +10,8 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.TypedColumn; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +52,8 @@ public class JoinStep1Job { ClusterUtils.removeDir(spark, joinedEntitiesPath); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); + final Dataset sources = ClusterUtils .readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class); @@ -61,16 +63,15 @@ public class JoinStep1Job { final TypedColumn, OaBrokerMainEntity> aggr = new RelatedProjectAggregator() .toColumn(); - sources + final Dataset dataset = sources .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .groupByKey( (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) - .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) - .write() - .mode(SaveMode.Overwrite) - .json(joinedEntitiesPath); + .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); + + ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java index 103d79553..cdcf0add4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java @@ -10,8 +10,8 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.TypedColumn; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +52,8 @@ public class JoinStep2Job { ClusterUtils.removeDir(spark, joinedEntitiesPath); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); + final Dataset sources = ClusterUtils .readPath(spark, workingPath + "/joinedEntities_step1", OaBrokerMainEntity.class); @@ -61,16 +63,15 @@ public class JoinStep2Job { final TypedColumn, OaBrokerMainEntity> aggr = new RelatedSoftwareAggregator() .toColumn(); - sources + final Dataset dataset = sources .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .groupByKey( (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) - .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) - .write() - .mode(SaveMode.Overwrite) - .json(joinedEntitiesPath); + .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); + + ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java index ceb199dc4..4d06f6f13 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java @@ -10,8 +10,8 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.TypedColumn; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +52,8 @@ public class JoinStep3Job { ClusterUtils.removeDir(spark, joinedEntitiesPath); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); + final Dataset sources = ClusterUtils .readPath(spark, workingPath + "/joinedEntities_step2", OaBrokerMainEntity.class); @@ -61,16 +63,15 @@ public class JoinStep3Job { final TypedColumn, OaBrokerMainEntity> aggr = new RelatedDatasetAggregator() .toColumn(); - sources + final Dataset dataset = sources .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .groupByKey( (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) - .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) - .write() - .mode(SaveMode.Overwrite) - .json(joinedEntitiesPath); + .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); + + ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java index 3067810dd..b53d7e39b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java @@ -10,8 +10,8 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.TypedColumn; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +52,8 @@ public class JoinStep4Job { ClusterUtils.removeDir(spark, joinedEntitiesPath); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); + final Dataset sources = ClusterUtils .readPath(spark, workingPath + "/joinedEntities_step3", OaBrokerMainEntity.class); @@ -61,16 +63,15 @@ public class JoinStep4Job { final TypedColumn, OaBrokerMainEntity> aggr = new RelatedPublicationAggregator() .toColumn(); - sources + final Dataset dataset = sources .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .groupByKey( (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) - .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)) - .write() - .mode(SaveMode.Overwrite) - .json(joinedEntitiesPath); + .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); + + ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java index 47a9f36c5..eb9add00d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java @@ -10,8 +10,8 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.TypedColumn; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +57,8 @@ public class PrepareGroupsJob { ClusterUtils.removeDir(spark, groupsPath); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_groups"); + final Dataset results = ClusterUtils .readPath(spark, workingPath + "/joinedEntities_step4", OaBrokerMainEntity.class); @@ -67,20 +69,16 @@ public class PrepareGroupsJob { final TypedColumn, ResultGroup> aggr = new ResultAggregator() .toColumn(); - final Dataset groups = results + final Dataset dataset = results .joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner") .groupByKey( (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) .agg(aggr) - .map( - (MapFunction, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class)) + .map(t -> t._2, Encoders.bean(ResultGroup.class)) .filter(rg -> rg.getData().size() > 1); - groups - .write() - .mode(SaveMode.Overwrite) - .json(groupsPath); + ClusterUtils.save(dataset, groupsPath, ResultGroup.class, total); }); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java index 6e006ccf0..0cfc1adcb 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java @@ -9,7 +9,7 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +54,8 @@ public class PrepareRelatedDatasetsJob { ClusterUtils.removeDir(spark, relsPath); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_rels"); + final Dataset datasets = ClusterUtils .readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class) .filter(d -> !ClusterUtils.isDedupRoot(d.getId())) @@ -67,16 +69,15 @@ public class PrepareRelatedDatasetsJob { .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); - rels + final Dataset dataset = rels .joinWith(datasets, datasets.col("openaireId").equalTo(rels.col("target")), "inner") .map(t -> { final RelatedDataset rel = new RelatedDataset(t._1.getSource(), t._2); rel.getRelDataset().setRelType(t._1.getRelClass()); return rel; - }, Encoders.bean(RelatedDataset.class)) - .write() - .mode(SaveMode.Overwrite) - .json(relsPath); + }, Encoders.bean(RelatedDataset.class)); + + ClusterUtils.save(dataset, relsPath, RelatedDataset.class, total); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java index 0af5d21b7..e988366c8 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java @@ -9,7 +9,7 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +56,8 @@ public class PrepareRelatedProjectsJob { ClusterUtils.removeDir(spark, relsPath); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_rels"); + final Dataset projects = ClusterUtils .readPath(spark, graphPath + "/project", Project.class) .filter(p -> !ClusterUtils.isDedupRoot(p.getId())) @@ -69,12 +71,12 @@ public class PrepareRelatedProjectsJob { .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); - rels + final Dataset dataset = rels .joinWith(projects, projects.col("openaireId").equalTo(rels.col("target")), "inner") - .map(t -> new RelatedProject(t._1.getSource(), t._2), Encoders.bean(RelatedProject.class)) - .write() - .mode(SaveMode.Overwrite) - .json(relsPath); + .map(t -> new RelatedProject(t._1.getSource(), t._2), Encoders.bean(RelatedProject.class)); + + ClusterUtils.save(dataset, relsPath, RelatedProject.class, total); + }); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java index 84752776e..724acc4dc 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java @@ -9,7 +9,7 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +55,8 @@ public class PrepareRelatedPublicationsJob { ClusterUtils.removeDir(spark, relsPath); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_rels"); + final Dataset pubs = ClusterUtils .readPath(spark, graphPath + "/publication", Publication.class) .filter(p -> !ClusterUtils.isDedupRoot(p.getId())) @@ -70,16 +72,15 @@ public class PrepareRelatedPublicationsJob { .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); - rels + final Dataset dataset = rels .joinWith(pubs, pubs.col("openaireId").equalTo(rels.col("target")), "inner") .map(t -> { final RelatedPublication rel = new RelatedPublication(t._1.getSource(), t._2); rel.getRelPublication().setRelType(t._1.getRelClass()); return rel; - }, Encoders.bean(RelatedPublication.class)) - .write() - .mode(SaveMode.Overwrite) - .json(relsPath); + }, Encoders.bean(RelatedPublication.class)); + + ClusterUtils.save(dataset, relsPath, RelatedPublication.class, total); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java index 0ad753a97..d15565d0d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java @@ -9,7 +9,7 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +56,8 @@ public class PrepareRelatedSoftwaresJob { ClusterUtils.removeDir(spark, relsPath); + final LongAccumulator total = spark.sparkContext().longAccumulator("total_rels"); + final Dataset softwares = ClusterUtils .readPath(spark, graphPath + "/software", Software.class) .filter(sw -> !ClusterUtils.isDedupRoot(sw.getId())) @@ -69,12 +71,11 @@ public class PrepareRelatedSoftwaresJob { .filter(r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter(r -> !ClusterUtils.isDedupRoot(r.getTarget())); - rels + final Dataset dataset = rels .joinWith(softwares, softwares.col("openaireId").equalTo(rels.col("target")), "inner") - .map(t -> new RelatedSoftware(t._1.getSource(), t._2), Encoders.bean(RelatedSoftware.class)) - .write() - .mode(SaveMode.Overwrite) - .json(relsPath); + .map(t -> new RelatedSoftware(t._1.getSource(), t._2), Encoders.bean(RelatedSoftware.class)); + + ClusterUtils.save(dataset, relsPath, RelatedSoftware.class, total); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java index 1b9c279fd..d3c7113ec 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java @@ -9,8 +9,8 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,13 +56,14 @@ public class PrepareSimpleEntititiesJob { ClusterUtils.removeDir(spark, simpleEntitiesPath); - prepareSimpleEntities(spark, graphPath, Publication.class) + final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); + + final Dataset dataset = prepareSimpleEntities(spark, graphPath, Publication.class) .union(prepareSimpleEntities(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class)) .union(prepareSimpleEntities(spark, graphPath, Software.class)) - .union(prepareSimpleEntities(spark, graphPath, OtherResearchProduct.class)) - .write() - .mode(SaveMode.Overwrite) - .json(simpleEntitiesPath); + .union(prepareSimpleEntities(spark, graphPath, OtherResearchProduct.class)); + + ClusterUtils.save(dataset, simpleEntitiesPath, OaBrokerMainEntity.class, total); }); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java index 0618ff7e3..af6ab30a1 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java @@ -83,8 +83,8 @@ public abstract class UpdateMatcher { return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0)); } - protected boolean isMissing(final String field) { - return StringUtils.isBlank(field); + protected boolean isMissing(final String s) { + return StringUtils.isBlank(s); } public int getMaxNumber() { @@ -108,7 +108,7 @@ public abstract class UpdateMatcher { } public void incrementAccumulator(final Map accumulators, final long n) { - if (accumulators.containsKey(accumulatorName())) { + if (accumulators != null && accumulators.containsKey(accumulatorName())) { accumulators.get(accumulatorName()).add(n); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java index de9b901d0..2d0106a7a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java @@ -4,7 +4,9 @@ package eu.dnetlib.dhp.broker.oa.util; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.LongAccumulator; import com.fasterxml.jackson.databind.ObjectMapper; @@ -44,4 +46,20 @@ public class ClusterUtils { || s.equals("isSupplementedTo"); } + public static T incrementAccumulator(final T o, final LongAccumulator acc) { + if (acc != null) { + acc.add(1); + } + return o; + } + + public static void save(final Dataset dataset, final String path, final Class clazz, + final LongAccumulator acc) { + dataset + .map(o -> ClusterUtils.incrementAccumulator(o, acc), Encoders.bean(clazz)) + .write() + .mode(SaveMode.Overwrite) + .json(path); + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index b61d5e7cc..26a9e9471 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -55,7 +55,7 @@ public class ConversionUtils { res.setLicense(BrokerConstants.OPEN_ACCESS); res.setHostedby(kvValue(i.getHostedby())); return res; - }); + }, 20); } public static OaBrokerTypedValue oafPidToBrokerPid(final StructuredProperty sp) { @@ -75,8 +75,8 @@ public class ConversionUtils { res.setOpenaireId(d.getId()); res.setOriginalId(first(d.getOriginalId())); res.setTitle(structPropValue(d.getTitle())); - res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid)); - res.setInstances(flatMappedList(d.getInstance(), ConversionUtils::oafInstanceToBrokerInstances)); + res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid, 20)); + res.setInstances(flatMappedList(d.getInstance(), ConversionUtils::oafInstanceToBrokerInstances, 20)); res.setCollectedFrom(mappedFirst(d.getCollectedfrom(), KeyValue::getValue)); return res; } @@ -90,8 +90,8 @@ public class ConversionUtils { res.setOpenaireId(p.getId()); res.setOriginalId(first(p.getOriginalId())); res.setTitle(structPropValue(p.getTitle())); - res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid)); - res.setInstances(flatMappedList(p.getInstance(), ConversionUtils::oafInstanceToBrokerInstances)); + res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid, 20)); + res.setInstances(flatMappedList(p.getInstance(), ConversionUtils::oafInstanceToBrokerInstances, 20)); res.setCollectedFrom(mappedFirst(p.getCollectedfrom(), KeyValue::getValue)); return res; @@ -107,23 +107,25 @@ public class ConversionUtils { res.setOpenaireId(result.getId()); res.setOriginalId(first(result.getOriginalId())); res.setTypology(classId(result.getResulttype())); - res.setTitles(structPropList(result.getTitle())); - res.setAbstracts(fieldList(result.getDescription())); + res.setTitles(structPropList(result.getTitle(), 10)); + res.setAbstracts(fieldList(result.getDescription(), 10)); res.setLanguage(classId(result.getLanguage())); res.setSubjects(structPropTypedList(result.getSubject())); - res.setCreators(mappedList(result.getAuthor(), ConversionUtils::oafAuthorToBrokerAuthor)); + res.setCreators(mappedList(result.getAuthor(), ConversionUtils::oafAuthorToBrokerAuthor, 30)); res.setPublicationdate(fieldValue(result.getDateofacceptance())); res.setPublisher(fieldValue(result.getPublisher())); res.setEmbargoenddate(fieldValue(result.getEmbargoenddate())); - res.setContributor(fieldList(result.getContributor())); + res.setContributor(fieldList(result.getContributor(), 20)); res .setJournal( result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null); res.setCollectedFromId(mappedFirst(result.getCollectedfrom(), KeyValue::getKey)); res.setCollectedFromName(mappedFirst(result.getCollectedfrom(), KeyValue::getValue)); - res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid)); - res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances)); - res.setExternalReferences(mappedList(result.getExternalReference(), ConversionUtils::oafExtRefToBrokerExtRef)); + res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid, 20)); + res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances, 20)); + res + .setExternalReferences( + mappedList(result.getExternalReference(), ConversionUtils::oafExtRefToBrokerExtRef, 20)); return res; } @@ -243,18 +245,25 @@ public class ConversionUtils { : null; } - private static List fieldList(final List> fl) { + private static List fieldList(final List> fl, final long maxSize) { return fl != null - ? fl.stream().map(Field::getValue).filter(StringUtils::isNotBlank).collect(Collectors.toList()) + ? fl + .stream() + .map(Field::getValue) + .map(s -> StringUtils.abbreviate(s, 3000)) // MAX 3000 CHARS + .filter(StringUtils::isNotBlank) + .limit(maxSize) + .collect(Collectors.toList()) : new ArrayList<>(); } - private static List structPropList(final List props) { + private static List structPropList(final List props, final long maxSize) { return props != null ? props .stream() .map(StructuredProperty::getValue) .filter(StringUtils::isNotBlank) + .limit(maxSize) .collect(Collectors.toList()) : new ArrayList<>(); } @@ -271,7 +280,7 @@ public class ConversionUtils { .collect(Collectors.toList()); } - private static List mappedList(final List list, final Function func) { + private static List mappedList(final List list, final Function func, final long maxSize) { if (list == null) { return new ArrayList<>(); } @@ -280,10 +289,12 @@ public class ConversionUtils { .stream() .map(func::apply) .filter(Objects::nonNull) + .limit(maxSize) .collect(Collectors.toList()); } - private static List flatMappedList(final List list, final Function> func) { + private static List flatMappedList(final List list, final Function> func, + final long maxSize) { if (list == null) { return new ArrayList<>(); } @@ -293,6 +304,7 @@ public class ConversionUtils { .map(func::apply) .flatMap(List::stream) .filter(Objects::nonNull) + .limit(maxSize) .collect(Collectors.toList()); } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index b8d12c42c..7667bfba7 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -378,9 +378,9 @@ eu.dnetlib.dhp.broker.oa.IndexOnESJob dhp-broker-events-${projectVersion}.jar - --executor-cores=${sparkExecutorCores} --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} + --conf spark.dynamicAllocation.maxExecutors="2" --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index f10c5d804..9128c9820 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -78,9 +78,8 @@ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - + + yarn cluster @@ -101,31 +100,6 @@ --isLookupUrl${isLookupUrl} --dedupConfProfile${dedupConfProfId} - - - - - - - yarn - cluster - IndexOnESJob - eu.dnetlib.dhp.broker.oa.IndexOnESJob - dhp-broker-events-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - - --workingPath${workingPath} - --index${esIndexName} - --esHost${esIndexHost} - diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java new file mode 100644 index 000000000..93bc5617f --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java @@ -0,0 +1,125 @@ + +package eu.dnetlib.dhp.broker.oa.matchers; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Arrays; +import java.util.Collection; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate; +import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; + +class UpdateMatcherTest { + + UpdateMatcher matcher = new EnrichMissingPublicationDate(); + + @BeforeEach + void setUp() throws Exception { + } + + @Test + void testSearchUpdatesForRecord_1() { + final OaBrokerMainEntity res = new OaBrokerMainEntity(); + final OaBrokerMainEntity p1 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p2 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p3 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p4 = new OaBrokerMainEntity(); + + final Collection> list = matcher + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + + assertTrue(list.isEmpty()); + } + + @Test + void testSearchUpdatesForRecord_2() { + final OaBrokerMainEntity res = new OaBrokerMainEntity(); + final OaBrokerMainEntity p1 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p2 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p3 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p4 = new OaBrokerMainEntity(); + + res.setPublicationdate("2018"); + + final Collection> list = matcher + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + + assertTrue(list.isEmpty()); + } + + @Test + void testSearchUpdatesForRecord_3() { + final OaBrokerMainEntity res = new OaBrokerMainEntity(); + final OaBrokerMainEntity p1 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p2 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p3 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p4 = new OaBrokerMainEntity(); + + p2.setPublicationdate("2018"); + + final Collection> list = matcher + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + + assertTrue(list.size() == 1); + } + + @Test + void testSearchUpdatesForRecord_4() { + final OaBrokerMainEntity res = new OaBrokerMainEntity(); + final OaBrokerMainEntity p1 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p2 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p3 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p4 = new OaBrokerMainEntity(); + + res.setPublicationdate("2018"); + p2.setPublicationdate("2018"); + + final Collection> list = matcher + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + + assertTrue(list.isEmpty()); + } + + @Test + void testSearchUpdatesForRecord_5() { + final OaBrokerMainEntity res = new OaBrokerMainEntity(); + final OaBrokerMainEntity p1 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p2 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p3 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p4 = new OaBrokerMainEntity(); + res.setPublicationdate("2018"); + p1.setPublicationdate("2018"); + p2.setPublicationdate("2018"); + p3.setPublicationdate("2018"); + p4.setPublicationdate("2018"); + + final Collection> list = matcher + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + + assertTrue(list.isEmpty()); + } + + @Test + void testSearchUpdatesForRecord_6() { + final OaBrokerMainEntity res = new OaBrokerMainEntity(); + final OaBrokerMainEntity p1 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p2 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p3 = new OaBrokerMainEntity(); + final OaBrokerMainEntity p4 = new OaBrokerMainEntity(); + + p1.setPublicationdate("2018"); + p2.setPublicationdate("2018"); + p3.setPublicationdate("2018"); + p4.setPublicationdate("2018"); + + final Collection> list = matcher + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + + assertTrue(list.size() == 1); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDateTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDateTest.java new file mode 100644 index 000000000..77a19af4c --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingPublicationDateTest.java @@ -0,0 +1,57 @@ + +package eu.dnetlib.dhp.broker.oa.matchers.simple; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; + +class EnrichMissingPublicationDateTest { + + final EnrichMissingPublicationDate matcher = new EnrichMissingPublicationDate(); + + @BeforeEach + void setUp() throws Exception { + } + + @Test + void testFindDifferences_1() { + final OaBrokerMainEntity source = new OaBrokerMainEntity(); + final OaBrokerMainEntity target = new OaBrokerMainEntity(); + final List list = matcher.findDifferences(source, target); + assertTrue(list.isEmpty()); + } + + @Test + void testFindDifferences_2() { + final OaBrokerMainEntity source = new OaBrokerMainEntity(); + final OaBrokerMainEntity target = new OaBrokerMainEntity(); + source.setPublicationdate("2018"); + final List list = matcher.findDifferences(source, target); + assertTrue(list.size() == 1); + } + + @Test + void testFindDifferences_3() { + final OaBrokerMainEntity source = new OaBrokerMainEntity(); + final OaBrokerMainEntity target = new OaBrokerMainEntity(); + target.setPublicationdate("2018"); + final List list = matcher.findDifferences(source, target); + assertTrue(list.isEmpty()); + } + + @Test + void testFindDifferences_4() { + final OaBrokerMainEntity source = new OaBrokerMainEntity(); + final OaBrokerMainEntity target = new OaBrokerMainEntity(); + source.setPublicationdate("2018"); + target.setPublicationdate("2018"); + final List list = matcher.findDifferences(source, target); + assertTrue(list.isEmpty()); + } + +}