From 262c29463e19d696899d3e6a730cb9bbf7a19d26 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Wed, 15 Jul 2020 09:18:40 +0200 Subject: [PATCH] relations with multiple datasources --- .../dhp/broker/model/EventFactory.java | 24 ++- .../dnetlib/dhp/broker/oa/JoinStep0Job.java | 20 +- .../oa/PrepareRelatedDatasourcesJob.java | 55 ++++- .../dhp/broker/oa/matchers/UpdateMatcher.java | 10 +- .../dhp/broker/oa/util/BrokerConstants.java | 4 + .../dhp/broker/oa/util/ConversionUtils.java | 16 +- .../util/DatasourceRelationsAccumulator.java | 68 ++++++ .../dhp/broker/oa/util/EventFinder.java | 17 +- .../dhp/broker/oa/util/UpdateInfo.java | 25 ++- .../withRels/RelatedDatasource.java | 42 ++++ ....java => RelatedDatasourceAggregator.java} | 24 ++- .../withRels/SimpleDatasourceInfo.java | 40 ---- .../oa/generate_all/oozie_app/workflow.xml | 24 +++ .../broker/oa/partial/oozie_app/workflow.xml | 199 +++++++++++++++++- .../broker/oa/matchers/UpdateMatcherTest.java | 20 +- pom.xml | 2 +- 16 files changed, 495 insertions(+), 95 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/DatasourceRelationsAccumulator.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasource.java rename dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/{AddDatasourceTypeAggregator.java => RelatedDatasourceAggregator.java} (55%) delete mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/SimpleDatasourceInfo.java diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java index 49e750698..4a58cfd36 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java @@ -11,6 +11,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateUtils; import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource; +import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; public class EventFactory { @@ -52,9 +54,11 @@ public class EventFactory { final OaBrokerMainEntity source = updateInfo.getSource(); final OaBrokerMainEntity target = updateInfo.getTarget(); - map.setTargetDatasourceId(target.getCollectedFromId()); - map.setTargetDatasourceName(target.getCollectedFromName()); - map.setTargetDatasourceType(target.getCollectedFromType()); + final OaBrokerRelatedDatasource targetDs = updateInfo.getTargetDs(); + + map.setTargetDatasourceId(targetDs.getOpenaireId()); + map.setTargetDatasourceName(targetDs.getName()); + map.setTargetDatasourceType(targetDs.getType()); map.setTargetResultId(target.getOpenaireId()); @@ -73,11 +77,19 @@ public class EventFactory { // PROVENANCE INFO map.setTrust(updateInfo.getTrust()); - map.setProvenanceDatasourceId(source.getCollectedFromId()); - map.setProvenanceDatasourceName(source.getCollectedFromName()); - map.setProvenanceDatasourceType(source.getCollectedFromType()); map.setProvenanceResultId(source.getOpenaireId()); + source + .getDatasources() + .stream() + .filter(ds -> ds.getRelType().equals(BrokerConstants.COLLECTED_FROM_REL)) + .findFirst() + .ifPresent(ds -> { + map.setProvenanceDatasourceId(ds.getOpenaireId()); + map.setProvenanceDatasourceName(ds.getName()); + map.setProvenanceDatasourceType(ds.getType()); + }); + return map; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java index eb1825fa5..39fa76e43 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java @@ -17,8 +17,8 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.AddDatasourceTypeAggregator; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.SimpleDatasourceInfo; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDatasource; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDatasourceAggregator; import scala.Tuple2; public class JoinStep0Job { @@ -45,33 +45,33 @@ public class JoinStep0Job { final String workingPath = parser.get("workingPath"); log.info("workingPath: {}", workingPath); - final String outputPath = workingPath + "/joinedEntities_step0"; - log.info("outputPath: {}", outputPath); + final String joinedEntitiesPath = workingPath + "/joinedEntities_step0"; + log.info("joinedEntitiesPath: {}", joinedEntitiesPath); final SparkConf conf = new SparkConf(); runWithSparkSession(conf, isSparkSessionManaged, spark -> { - ClusterUtils.removeDir(spark, outputPath); + ClusterUtils.removeDir(spark, joinedEntitiesPath); final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final Dataset sources = ClusterUtils .readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class); - final Dataset datasources = ClusterUtils - .readPath(spark, workingPath + "/datasources", SimpleDatasourceInfo.class); + final Dataset typedRels = ClusterUtils + .readPath(spark, workingPath + "/relatedDatasources", RelatedDatasource.class); - final TypedColumn, OaBrokerMainEntity> aggr = new AddDatasourceTypeAggregator() + final TypedColumn, OaBrokerMainEntity> aggr = new RelatedDatasourceAggregator() .toColumn(); final Dataset dataset = sources - .joinWith(datasources, sources.col("collectedFromId").equalTo(datasources.col("id")), "inner") + .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") .groupByKey(t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); - ClusterUtils.save(dataset, outputPath, OaBrokerMainEntity.class, total); + ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java index 30f5ddac3..166372a7f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java @@ -9,14 +9,23 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; -import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.SimpleDatasourceInfo; +import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; +import eu.dnetlib.dhp.broker.oa.util.DatasourceRelationsAccumulator; +import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDatasource; import eu.dnetlib.dhp.schema.oaf.Datasource; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.Software; +import scala.Tuple3; public class PrepareRelatedDatasourcesJob { @@ -42,7 +51,7 @@ public class PrepareRelatedDatasourcesJob { final String workingPath = parser.get("workingPath"); log.info("workingPath: {}", workingPath); - final String relsPath = workingPath + "/datasources"; + final String relsPath = workingPath + "/relatedDatasources"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); @@ -53,16 +62,46 @@ public class PrepareRelatedDatasourcesJob { final LongAccumulator total = spark.sparkContext().longAccumulator("total_datasources"); - final Dataset dataset = ClusterUtils - .readPath(spark, graphPath + "/datasource", Datasource.class) - .map( - ds -> new SimpleDatasourceInfo(ds.getId(), ds.getDatasourcetype().getClassid()), - Encoders.bean(SimpleDatasourceInfo.class)); + final Dataset> rels = prepareResultTuples( + spark, graphPath, Publication.class) + .union(prepareResultTuples(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class)) + .union(prepareResultTuples(spark, graphPath, Software.class)) + .union(prepareResultTuples(spark, graphPath, OtherResearchProduct.class)); - ClusterUtils.save(dataset, relsPath, SimpleDatasourceInfo.class, total); + final Dataset datasources = ClusterUtils + .readPath(spark, graphPath + "/datasource", Datasource.class) + .map(ConversionUtils::oafDatasourceToBrokerDatasource, Encoders.bean(OaBrokerRelatedDatasource.class)); + + final Dataset dataset = rels + .joinWith(datasources, datasources.col("openaireId").equalTo(rels.col("_2")), "inner") + .map(t -> { + final RelatedDatasource r = new RelatedDatasource(); + r.setSource(t._1._1()); + r.setRelDatasource(t._2); + r.getRelDatasource().setRelType(t._1._3()); + return r; + }, Encoders.bean(RelatedDatasource.class)); + + ClusterUtils.save(dataset, relsPath, RelatedDatasource.class, total); }); } + private static final Dataset> prepareResultTuples(final SparkSession spark, + final String graphPath, + final Class sourceClass) { + + return ClusterUtils + .readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass) + .filter(r -> !ClusterUtils.isDedupRoot(r.getId())) + .filter(r -> r.getDataInfo().getDeletedbyinference()) + .map( + r -> DatasourceRelationsAccumulator.calculateTuples(r), + Encoders.bean(DatasourceRelationsAccumulator.class)) + .flatMap( + acc -> acc.getRels().iterator(), + Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING())); + } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java index 3d688fa1d..fba82aa8c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java @@ -15,6 +15,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.spark.util.LongAccumulator; import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; @@ -34,18 +35,19 @@ public abstract class UpdateMatcher { this.highlightToStringFunction = highlightToStringFunction; } - public Collection> searchUpdatesForRecord(final OaBrokerMainEntity res, + public Collection> searchUpdatesForRecord(final OaBrokerMainEntity target, + final OaBrokerRelatedDatasource targetDs, final Collection others, final Map accumulators) { final Map> infoMap = new HashMap<>(); for (final OaBrokerMainEntity source : others) { - if (source != res) { - for (final T hl : findDifferences(source, res)) { + if (source != target) { + for (final T hl : findDifferences(source, target)) { final Topic topic = getTopicFunction().apply(hl); if (topic != null) { - final UpdateInfo info = new UpdateInfo<>(topic, hl, source, res, + final UpdateInfo info = new UpdateInfo<>(topic, hl, source, target, targetDs, getCompileHighlightFunction(), getHighlightToStringFunction()); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java index 5308b9dff..7a09862d8 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/BrokerConstants.java @@ -14,6 +14,10 @@ public class BrokerConstants { public static final String OPEN_ACCESS = "OPEN"; public static final String IS_MERGED_IN_CLASS = "isMergedIn"; + public static final String COLLECTED_FROM_REL = "collectedFrom"; + + public static final String HOSTED_BY_REL = "hostedBy"; + public static final float MIN_TRUST = 0.25f; public static final float MAX_TRUST = 1.00f; diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index d00c5b817..053627a5f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -22,11 +22,13 @@ import eu.dnetlib.broker.objects.OaBrokerJournal; import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerProject; import eu.dnetlib.broker.objects.OaBrokerRelatedDataset; +import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource; import eu.dnetlib.broker.objects.OaBrokerRelatedPublication; import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware; import eu.dnetlib.broker.objects.OaBrokerTypedValue; import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.ExternalReference; import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.dhp.schema.oaf.Instance; @@ -119,8 +121,6 @@ public class ConversionUtils { res .setJournal( result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null); - res.setCollectedFromId(mappedFirst(result.getCollectedfrom(), KeyValue::getKey)); - res.setCollectedFromName(mappedFirst(result.getCollectedfrom(), KeyValue::getValue)); res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid)); res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances)); res @@ -223,6 +223,18 @@ public class ConversionUtils { return res; } + public static final OaBrokerRelatedDatasource oafDatasourceToBrokerDatasource(final Datasource ds) { + if (ds == null) { + return null; + } + + final OaBrokerRelatedDatasource res = new OaBrokerRelatedDatasource(); + res.setName(StringUtils.defaultIfBlank(fieldValue(ds.getOfficialname()), fieldValue(ds.getEnglishname()))); + res.setOpenaireId(ds.getId()); + res.setType(classId(ds.getDatasourcetype())); + return res; + } + private static String first(final List list) { return list != null && list.size() > 0 ? list.get(0) : null; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/DatasourceRelationsAccumulator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/DatasourceRelationsAccumulator.java new file mode 100644 index 000000000..75c4625ce --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/DatasourceRelationsAccumulator.java @@ -0,0 +1,68 @@ + +package eu.dnetlib.dhp.broker.oa.util; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; + +import eu.dnetlib.dhp.schema.oaf.Result; +import scala.Tuple3; + +public class DatasourceRelationsAccumulator implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 3256220670651218957L; + + private List> rels = new ArrayList<>(); + + public List> getRels() { + return rels; + } + + public void setRels(final List> rels) { + this.rels = rels; + } + + protected void addTuple(final Tuple3 t) { + rels.add(t); + } + + public static final DatasourceRelationsAccumulator calculateTuples(final Result r) { + + final Set collectedFromSet = r + .getCollectedfrom() + .stream() + .map(kv -> kv.getKey()) + .filter(StringUtils::isNotBlank) + .distinct() + .collect(Collectors.toSet()); + + final Set hostedBySet = r + .getInstance() + .stream() + .map(i -> i.getHostedby()) + .filter(Objects::nonNull) + .filter(kv -> !StringUtils.equalsIgnoreCase(kv.getValue(), "Unknown Repository")) + .map(kv -> kv.getKey()) + .filter(StringUtils::isNotBlank) + .distinct() + .filter(id -> !collectedFromSet.contains(id)) + .collect(Collectors.toSet()); + + final DatasourceRelationsAccumulator res = new DatasourceRelationsAccumulator(); + collectedFromSet + .stream() + .map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.COLLECTED_FROM_REL)) + .forEach(res::addTuple); + hostedBySet.stream().map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.HOSTED_BY_REL)).forEach(res::addTuple); + return res; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java index b6328eb95..1ab56cc34 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java @@ -11,6 +11,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource; import eu.dnetlib.dhp.broker.model.EventFactory; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy; @@ -80,9 +81,11 @@ public class EventFinder { final List> list = new ArrayList<>(); for (final OaBrokerMainEntity target : results.getData()) { - if (verifyTarget(target, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) { - for (final UpdateMatcher matcher : matchers) { - list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), accumulators)); + for (final OaBrokerRelatedDatasource targetDs : target.getDatasources()) { + if (verifyTarget(targetDs, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) { + for (final UpdateMatcher matcher : matchers) { + list.addAll(matcher.searchUpdatesForRecord(target, targetDs, results.getData(), accumulators)); + } } } } @@ -90,17 +93,17 @@ public class EventFinder { return asEventGroup(list); } - private static boolean verifyTarget(final OaBrokerMainEntity target, + private static boolean verifyTarget(final OaBrokerRelatedDatasource target, final Set dsIdWhitelist, final Set dsIdBlacklist, final Set dsTypeWhitelist) { - if (dsIdWhitelist.contains(target.getCollectedFromId())) { + if (dsIdWhitelist.contains(target.getOpenaireId())) { return true; - } else if (dsIdBlacklist.contains(target.getCollectedFromId())) { + } else if (dsIdBlacklist.contains(target.getOpenaireId())) { return false; } else { - return dsTypeWhitelist.contains(target.getCollectedFromType()); + return dsTypeWhitelist.contains(target.getType()); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java index ef8fb240c..fca954247 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java @@ -8,6 +8,7 @@ import eu.dnetlib.broker.objects.OaBrokerEventPayload; import eu.dnetlib.broker.objects.OaBrokerInstance; import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerProvenance; +import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource; import eu.dnetlib.dhp.broker.model.Topic; public final class UpdateInfo { @@ -20,6 +21,8 @@ public final class UpdateInfo { private final OaBrokerMainEntity target; + private final OaBrokerRelatedDatasource targetDs; + private final BiConsumer compileHighlight; private final Function highlightToString; @@ -28,12 +31,14 @@ public final class UpdateInfo { public UpdateInfo(final Topic topic, final T highlightValue, final OaBrokerMainEntity source, final OaBrokerMainEntity target, + final OaBrokerRelatedDatasource targetDs, final BiConsumer compileHighlight, final Function highlightToString) { this.topic = topic; this.highlightValue = highlightValue; this.source = source; this.target = target; + this.targetDs = targetDs; this.compileHighlight = compileHighlight; this.highlightToString = highlightToString; this.trust = TrustUtils.calculateTrust(source, target); @@ -51,6 +56,10 @@ public final class UpdateInfo { return target; } + public OaBrokerRelatedDatasource getTargetDs() { + return targetDs; + } + protected Topic getTopic() { return topic; } @@ -75,8 +84,20 @@ public final class UpdateInfo { compileHighlight.accept(hl, getHighlightValue()); final String provId = getSource().getOpenaireId(); - final String provRepo = getSource().getCollectedFromName(); - final String provType = getSource().getCollectedFromType(); + final String provRepo = getSource() + .getDatasources() + .stream() + .filter(ds -> ds.getRelType().equals(BrokerConstants.COLLECTED_FROM_REL)) + .map(ds -> ds.getName()) + .findFirst() + .orElse(""); + final String provType = getSource() + .getDatasources() + .stream() + .filter(ds -> ds.getRelType().equals(BrokerConstants.COLLECTED_FROM_REL)) + .map(ds -> ds.getType()) + .findFirst() + .orElse(""); final String provUrl = getSource() .getInstances() diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasource.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasource.java new file mode 100644 index 000000000..a27df502b --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasource.java @@ -0,0 +1,42 @@ + +package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; + +import java.io.Serializable; + +import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource; + +public class RelatedDatasource implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 3015550240920424010L; + + private String source; + private OaBrokerRelatedDatasource relDatasource; + + public RelatedDatasource() { + } + + public RelatedDatasource(final String source, final OaBrokerRelatedDatasource relDatasource) { + this.source = source; + this.relDatasource = relDatasource; + } + + public String getSource() { + return source; + } + + public void setSource(final String source) { + this.source = source; + } + + public OaBrokerRelatedDatasource getRelDatasource() { + return relDatasource; + } + + public void setRelDatasource(final OaBrokerRelatedDatasource relDatasource) { + this.relDatasource = relDatasource; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/AddDatasourceTypeAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasourceAggregator.java similarity index 55% rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/AddDatasourceTypeAggregator.java rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasourceAggregator.java index ccd15c8c6..2c0c7917d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/AddDatasourceTypeAggregator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedDatasourceAggregator.java @@ -7,15 +7,16 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.expressions.Aggregator; import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; import scala.Tuple2; -public class AddDatasourceTypeAggregator - extends Aggregator, OaBrokerMainEntity, OaBrokerMainEntity> { +public class RelatedDatasourceAggregator + extends Aggregator, OaBrokerMainEntity, OaBrokerMainEntity> { /** * */ - private static final long serialVersionUID = 8788588975496014728L; + private static final long serialVersionUID = -7212121913834713672L; @Override public OaBrokerMainEntity zero() { @@ -29,10 +30,10 @@ public class AddDatasourceTypeAggregator @Override public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, - final Tuple2 t) { + final Tuple2 t) { final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1; - if (t._2 != null && StringUtils.isNotBlank(t._2.getType())) { - res.setCollectedFromType(t._2.getType()); + if (t._2 != null && res.getDatasources().size() < BrokerConstants.MAX_NUMBER_OF_RELS) { + res.getDatasources().add(t._2.getRelDatasource()); } return res; @@ -40,7 +41,15 @@ public class AddDatasourceTypeAggregator @Override public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { - if (StringUtils.isNotBlank(g1.getOpenaireId()) && StringUtils.isNotBlank(g1.getCollectedFromType())) { + if (StringUtils.isNotBlank(g1.getOpenaireId())) { + final int availables = BrokerConstants.MAX_NUMBER_OF_RELS - g1.getDatasources().size(); + if (availables > 0) { + if (g2.getDatasources().size() <= availables) { + g1.getDatasources().addAll(g2.getDatasources()); + } else { + g1.getDatasources().addAll(g2.getDatasources().subList(0, availables)); + } + } return g1; } else { return g2; @@ -56,4 +65,5 @@ public class AddDatasourceTypeAggregator public Encoder outputEncoder() { return Encoders.bean(OaBrokerMainEntity.class); } + } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/SimpleDatasourceInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/SimpleDatasourceInfo.java deleted file mode 100644 index 966f63fa0..000000000 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/SimpleDatasourceInfo.java +++ /dev/null @@ -1,40 +0,0 @@ - -package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; - -import java.io.Serializable; - -public class SimpleDatasourceInfo implements Serializable { - - /** - * - */ - private static final long serialVersionUID = 2996609859416024734L; - - private String id; - private String type; - - public SimpleDatasourceInfo() { - } - - public SimpleDatasourceInfo(final String id, final String type) { - this.id = id; - this.type = type; - } - - public String getId() { - return id; - } - - public void setId(final String id) { - this.id = id; - } - - public String getType() { - return type; - } - - public void setType(final String type) { - this.type = type; - } - -} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 2c728cd98..568d5dc5a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -448,6 +448,30 @@ --index${esIndexName} --esHost${esIndexHost} + + + + + + + yarn + cluster + GenerateStatsJob + eu.dnetlib.dhp.broker.oa.GenerateStatsJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index b4155f93f..2271a9e0e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -64,14 +64,209 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + yarn + cluster + JoinStep0 + eu.dnetlib.dhp.broker.oa.JoinStep0Job + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + JoinStep1 + eu.dnetlib.dhp.broker.oa.JoinStep1Job + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + JoinStep2 + eu.dnetlib.dhp.broker.oa.JoinStep2Job + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + JoinStep3 + eu.dnetlib.dhp.broker.oa.JoinStep3Job + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + JoinStep4 + eu.dnetlib.dhp.broker.oa.JoinStep4Job + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + - + + + yarn + cluster + PrepareGroupsJob + eu.dnetlib.dhp.broker.oa.PrepareGroupsJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --graphPath${graphInputPath} + --workingPath${workingPath} + + + + + + + + yarn + cluster + GenerateEventsJob + eu.dnetlib.dhp.broker.oa.GenerateEventsJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --workingPath${workingPath} + --datasourceIdWhitelist${datasourceIdWhitelist} + --datasourceTypeWhitelist${datasourceTypeWhitelist} + --datasourceIdBlacklist${datasourceIdBlacklist} + + + + + + + + yarn + cluster + IndexOnESJob + eu.dnetlib.dhp.broker.oa.IndexOnESJob + dhp-broker-events-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --workingPath${workingPath} + --index${esIndexName} + --esHost${esIndexHost} + + + + + + yarn cluster diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java index 82374b335..8fa95abe5 100644 --- a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java @@ -8,15 +8,23 @@ import java.util.Collection; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; +@ExtendWith(MockitoExtension.class) class UpdateMatcherTest { UpdateMatcher matcher = new EnrichMissingPublicationDate(); + @Mock + private OaBrokerRelatedDatasource targetDs; + @BeforeEach void setUp() throws Exception { } @@ -30,7 +38,7 @@ class UpdateMatcherTest { final OaBrokerMainEntity p4 = new OaBrokerMainEntity(); final Collection> list = matcher - .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); + .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null); assertTrue(list.isEmpty()); } @@ -46,7 +54,7 @@ class UpdateMatcherTest { res.setPublicationdate("2018"); final Collection> list = matcher - .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); + .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null); assertTrue(list.isEmpty()); } @@ -62,7 +70,7 @@ class UpdateMatcherTest { p2.setPublicationdate("2018"); final Collection> list = matcher - .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); + .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null); assertTrue(list.size() == 1); } @@ -79,7 +87,7 @@ class UpdateMatcherTest { p2.setPublicationdate("2018"); final Collection> list = matcher - .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); + .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null); assertTrue(list.isEmpty()); } @@ -98,7 +106,7 @@ class UpdateMatcherTest { p4.setPublicationdate("2018"); final Collection> list = matcher - .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); + .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null); assertTrue(list.isEmpty()); } @@ -117,7 +125,7 @@ class UpdateMatcherTest { p4.setPublicationdate("2018"); final Collection> list = matcher - .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); + .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null); assertTrue(list.size() == 1); } diff --git a/pom.xml b/pom.xml index 411ef9521..eec6c303f 100644 --- a/pom.xml +++ b/pom.xml @@ -624,6 +624,6 @@ 3.3.3 3.4.2 [2.12,3.0) - 3.0.0 + 3.1.0