1
0
Fork 0

relations with multiple datasources

This commit is contained in:
Michele Artini 2020-07-15 09:18:40 +02:00
parent 7d6e269b40
commit 262c29463e
16 changed files with 495 additions and 95 deletions

View File

@ -11,6 +11,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.time.DateUtils;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
public class EventFactory { public class EventFactory {
@ -52,9 +54,11 @@ public class EventFactory {
final OaBrokerMainEntity source = updateInfo.getSource(); final OaBrokerMainEntity source = updateInfo.getSource();
final OaBrokerMainEntity target = updateInfo.getTarget(); final OaBrokerMainEntity target = updateInfo.getTarget();
map.setTargetDatasourceId(target.getCollectedFromId()); final OaBrokerRelatedDatasource targetDs = updateInfo.getTargetDs();
map.setTargetDatasourceName(target.getCollectedFromName());
map.setTargetDatasourceType(target.getCollectedFromType()); map.setTargetDatasourceId(targetDs.getOpenaireId());
map.setTargetDatasourceName(targetDs.getName());
map.setTargetDatasourceType(targetDs.getType());
map.setTargetResultId(target.getOpenaireId()); map.setTargetResultId(target.getOpenaireId());
@ -73,11 +77,19 @@ public class EventFactory {
// PROVENANCE INFO // PROVENANCE INFO
map.setTrust(updateInfo.getTrust()); map.setTrust(updateInfo.getTrust());
map.setProvenanceDatasourceId(source.getCollectedFromId());
map.setProvenanceDatasourceName(source.getCollectedFromName());
map.setProvenanceDatasourceType(source.getCollectedFromType());
map.setProvenanceResultId(source.getOpenaireId()); map.setProvenanceResultId(source.getOpenaireId());
source
.getDatasources()
.stream()
.filter(ds -> ds.getRelType().equals(BrokerConstants.COLLECTED_FROM_REL))
.findFirst()
.ifPresent(ds -> {
map.setProvenanceDatasourceId(ds.getOpenaireId());
map.setProvenanceDatasourceName(ds.getName());
map.setProvenanceDatasourceType(ds.getType());
});
return map; return map;
} }

View File

@ -17,8 +17,8 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.AddDatasourceTypeAggregator; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDatasource;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.SimpleDatasourceInfo; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDatasourceAggregator;
import scala.Tuple2; import scala.Tuple2;
public class JoinStep0Job { public class JoinStep0Job {
@ -45,33 +45,33 @@ public class JoinStep0Job {
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath); log.info("workingPath: {}", workingPath);
final String outputPath = workingPath + "/joinedEntities_step0"; final String joinedEntitiesPath = workingPath + "/joinedEntities_step0";
log.info("outputPath: {}", outputPath); log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> { runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, outputPath); ClusterUtils.removeDir(spark, joinedEntitiesPath);
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class); .readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
final Dataset<SimpleDatasourceInfo> datasources = ClusterUtils final Dataset<RelatedDatasource> typedRels = ClusterUtils
.readPath(spark, workingPath + "/datasources", SimpleDatasourceInfo.class); .readPath(spark, workingPath + "/relatedDatasources", RelatedDatasource.class);
final TypedColumn<Tuple2<OaBrokerMainEntity, SimpleDatasourceInfo>, OaBrokerMainEntity> aggr = new AddDatasourceTypeAggregator() final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedDatasource>, OaBrokerMainEntity> aggr = new RelatedDatasourceAggregator()
.toColumn(); .toColumn();
final Dataset<OaBrokerMainEntity> dataset = sources final Dataset<OaBrokerMainEntity> dataset = sources
.joinWith(datasources, sources.col("collectedFromId").equalTo(datasources.col("id")), "inner") .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
.groupByKey(t -> t._1.getOpenaireId(), Encoders.STRING()) .groupByKey(t -> t._1.getOpenaireId(), Encoders.STRING())
.agg(aggr) .agg(aggr)
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
ClusterUtils.save(dataset, outputPath, OaBrokerMainEntity.class, total); ClusterUtils.save(dataset, joinedEntitiesPath, OaBrokerMainEntity.class, total);
}); });

View File

@ -9,14 +9,23 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.SimpleDatasourceInfo; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.DatasourceRelationsAccumulator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDatasource;
import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import scala.Tuple3;
public class PrepareRelatedDatasourcesJob { public class PrepareRelatedDatasourcesJob {
@ -42,7 +51,7 @@ public class PrepareRelatedDatasourcesJob {
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath); log.info("workingPath: {}", workingPath);
final String relsPath = workingPath + "/datasources"; final String relsPath = workingPath + "/relatedDatasources";
log.info("relsPath: {}", relsPath); log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
@ -53,16 +62,46 @@ public class PrepareRelatedDatasourcesJob {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_datasources"); final LongAccumulator total = spark.sparkContext().longAccumulator("total_datasources");
final Dataset<SimpleDatasourceInfo> dataset = ClusterUtils final Dataset<Tuple3<String, String, String>> rels = prepareResultTuples(
.readPath(spark, graphPath + "/datasource", Datasource.class) spark, graphPath, Publication.class)
.map( .union(prepareResultTuples(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class))
ds -> new SimpleDatasourceInfo(ds.getId(), ds.getDatasourcetype().getClassid()), .union(prepareResultTuples(spark, graphPath, Software.class))
Encoders.bean(SimpleDatasourceInfo.class)); .union(prepareResultTuples(spark, graphPath, OtherResearchProduct.class));
ClusterUtils.save(dataset, relsPath, SimpleDatasourceInfo.class, total); final Dataset<OaBrokerRelatedDatasource> datasources = ClusterUtils
.readPath(spark, graphPath + "/datasource", Datasource.class)
.map(ConversionUtils::oafDatasourceToBrokerDatasource, Encoders.bean(OaBrokerRelatedDatasource.class));
final Dataset<RelatedDatasource> dataset = rels
.joinWith(datasources, datasources.col("openaireId").equalTo(rels.col("_2")), "inner")
.map(t -> {
final RelatedDatasource r = new RelatedDatasource();
r.setSource(t._1._1());
r.setRelDatasource(t._2);
r.getRelDatasource().setRelType(t._1._3());
return r;
}, Encoders.bean(RelatedDatasource.class));
ClusterUtils.save(dataset, relsPath, RelatedDatasource.class, total);
}); });
} }
private static final Dataset<Tuple3<String, String, String>> prepareResultTuples(final SparkSession spark,
final String graphPath,
final Class<? extends Result> sourceClass) {
return ClusterUtils
.readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
.filter(r -> !ClusterUtils.isDedupRoot(r.getId()))
.filter(r -> r.getDataInfo().getDeletedbyinference())
.map(
r -> DatasourceRelationsAccumulator.calculateTuples(r),
Encoders.bean(DatasourceRelationsAccumulator.class))
.flatMap(
acc -> acc.getRels().iterator(),
Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING()));
}
} }

View File

@ -15,6 +15,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.LongAccumulator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
@ -34,18 +35,19 @@ public abstract class UpdateMatcher<T> {
this.highlightToStringFunction = highlightToStringFunction; this.highlightToStringFunction = highlightToStringFunction;
} }
public Collection<UpdateInfo<T>> searchUpdatesForRecord(final OaBrokerMainEntity res, public Collection<UpdateInfo<T>> searchUpdatesForRecord(final OaBrokerMainEntity target,
final OaBrokerRelatedDatasource targetDs,
final Collection<OaBrokerMainEntity> others, final Collection<OaBrokerMainEntity> others,
final Map<String, LongAccumulator> accumulators) { final Map<String, LongAccumulator> accumulators) {
final Map<String, UpdateInfo<T>> infoMap = new HashMap<>(); final Map<String, UpdateInfo<T>> infoMap = new HashMap<>();
for (final OaBrokerMainEntity source : others) { for (final OaBrokerMainEntity source : others) {
if (source != res) { if (source != target) {
for (final T hl : findDifferences(source, res)) { for (final T hl : findDifferences(source, target)) {
final Topic topic = getTopicFunction().apply(hl); final Topic topic = getTopicFunction().apply(hl);
if (topic != null) { if (topic != null) {
final UpdateInfo<T> info = new UpdateInfo<>(topic, hl, source, res, final UpdateInfo<T> info = new UpdateInfo<>(topic, hl, source, target, targetDs,
getCompileHighlightFunction(), getCompileHighlightFunction(),
getHighlightToStringFunction()); getHighlightToStringFunction());

View File

@ -14,6 +14,10 @@ public class BrokerConstants {
public static final String OPEN_ACCESS = "OPEN"; public static final String OPEN_ACCESS = "OPEN";
public static final String IS_MERGED_IN_CLASS = "isMergedIn"; public static final String IS_MERGED_IN_CLASS = "isMergedIn";
public static final String COLLECTED_FROM_REL = "collectedFrom";
public static final String HOSTED_BY_REL = "hostedBy";
public static final float MIN_TRUST = 0.25f; public static final float MIN_TRUST = 0.25f;
public static final float MAX_TRUST = 1.00f; public static final float MAX_TRUST = 1.00f;

View File

@ -22,11 +22,13 @@ import eu.dnetlib.broker.objects.OaBrokerJournal;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerProject; import eu.dnetlib.broker.objects.OaBrokerProject;
import eu.dnetlib.broker.objects.OaBrokerRelatedDataset; import eu.dnetlib.broker.objects.OaBrokerRelatedDataset;
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.broker.objects.OaBrokerRelatedPublication; import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware; import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
import eu.dnetlib.broker.objects.OaBrokerTypedValue; import eu.dnetlib.broker.objects.OaBrokerTypedValue;
import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.ExternalReference; import eu.dnetlib.dhp.schema.oaf.ExternalReference;
import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Instance; import eu.dnetlib.dhp.schema.oaf.Instance;
@ -119,8 +121,6 @@ public class ConversionUtils {
res res
.setJournal( .setJournal(
result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null); result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null);
res.setCollectedFromId(mappedFirst(result.getCollectedfrom(), KeyValue::getKey));
res.setCollectedFromName(mappedFirst(result.getCollectedfrom(), KeyValue::getValue));
res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid)); res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid));
res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances)); res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
res res
@ -223,6 +223,18 @@ public class ConversionUtils {
return res; return res;
} }
public static final OaBrokerRelatedDatasource oafDatasourceToBrokerDatasource(final Datasource ds) {
if (ds == null) {
return null;
}
final OaBrokerRelatedDatasource res = new OaBrokerRelatedDatasource();
res.setName(StringUtils.defaultIfBlank(fieldValue(ds.getOfficialname()), fieldValue(ds.getEnglishname())));
res.setOpenaireId(ds.getId());
res.setType(classId(ds.getDatasourcetype()));
return res;
}
private static String first(final List<String> list) { private static String first(final List<String> list) {
return list != null && list.size() > 0 ? list.get(0) : null; return list != null && list.size() > 0 ? list.get(0) : null;
} }

View File

@ -0,0 +1,68 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.oaf.Result;
import scala.Tuple3;
public class DatasourceRelationsAccumulator implements Serializable {
/**
*
*/
private static final long serialVersionUID = 3256220670651218957L;
private List<Tuple3<String, String, String>> rels = new ArrayList<>();
public List<Tuple3<String, String, String>> getRels() {
return rels;
}
public void setRels(final List<Tuple3<String, String, String>> rels) {
this.rels = rels;
}
protected void addTuple(final Tuple3<String, String, String> t) {
rels.add(t);
}
public static final DatasourceRelationsAccumulator calculateTuples(final Result r) {
final Set<String> collectedFromSet = r
.getCollectedfrom()
.stream()
.map(kv -> kv.getKey())
.filter(StringUtils::isNotBlank)
.distinct()
.collect(Collectors.toSet());
final Set<String> hostedBySet = r
.getInstance()
.stream()
.map(i -> i.getHostedby())
.filter(Objects::nonNull)
.filter(kv -> !StringUtils.equalsIgnoreCase(kv.getValue(), "Unknown Repository"))
.map(kv -> kv.getKey())
.filter(StringUtils::isNotBlank)
.distinct()
.filter(id -> !collectedFromSet.contains(id))
.collect(Collectors.toSet());
final DatasourceRelationsAccumulator res = new DatasourceRelationsAccumulator();
collectedFromSet
.stream()
.map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.COLLECTED_FROM_REL))
.forEach(res::addTuple);
hostedBySet.stream().map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.HOSTED_BY_REL)).forEach(res::addTuple);
return res;
}
}

View File

@ -11,6 +11,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.dhp.broker.model.EventFactory; import eu.dnetlib.dhp.broker.model.EventFactory;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy; import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy;
@ -80,9 +81,11 @@ public class EventFinder {
final List<UpdateInfo<?>> list = new ArrayList<>(); final List<UpdateInfo<?>> list = new ArrayList<>();
for (final OaBrokerMainEntity target : results.getData()) { for (final OaBrokerMainEntity target : results.getData()) {
if (verifyTarget(target, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) { for (final OaBrokerRelatedDatasource targetDs : target.getDatasources()) {
for (final UpdateMatcher<?> matcher : matchers) { if (verifyTarget(targetDs, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) {
list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), accumulators)); for (final UpdateMatcher<?> matcher : matchers) {
list.addAll(matcher.searchUpdatesForRecord(target, targetDs, results.getData(), accumulators));
}
} }
} }
} }
@ -90,17 +93,17 @@ public class EventFinder {
return asEventGroup(list); return asEventGroup(list);
} }
private static boolean verifyTarget(final OaBrokerMainEntity target, private static boolean verifyTarget(final OaBrokerRelatedDatasource target,
final Set<String> dsIdWhitelist, final Set<String> dsIdWhitelist,
final Set<String> dsIdBlacklist, final Set<String> dsIdBlacklist,
final Set<String> dsTypeWhitelist) { final Set<String> dsTypeWhitelist) {
if (dsIdWhitelist.contains(target.getCollectedFromId())) { if (dsIdWhitelist.contains(target.getOpenaireId())) {
return true; return true;
} else if (dsIdBlacklist.contains(target.getCollectedFromId())) { } else if (dsIdBlacklist.contains(target.getOpenaireId())) {
return false; return false;
} else { } else {
return dsTypeWhitelist.contains(target.getCollectedFromType()); return dsTypeWhitelist.contains(target.getType());
} }
} }

View File

@ -8,6 +8,7 @@ import eu.dnetlib.broker.objects.OaBrokerEventPayload;
import eu.dnetlib.broker.objects.OaBrokerInstance; import eu.dnetlib.broker.objects.OaBrokerInstance;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerProvenance; import eu.dnetlib.broker.objects.OaBrokerProvenance;
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
public final class UpdateInfo<T> { public final class UpdateInfo<T> {
@ -20,6 +21,8 @@ public final class UpdateInfo<T> {
private final OaBrokerMainEntity target; private final OaBrokerMainEntity target;
private final OaBrokerRelatedDatasource targetDs;
private final BiConsumer<OaBrokerMainEntity, T> compileHighlight; private final BiConsumer<OaBrokerMainEntity, T> compileHighlight;
private final Function<T, String> highlightToString; private final Function<T, String> highlightToString;
@ -28,12 +31,14 @@ public final class UpdateInfo<T> {
public UpdateInfo(final Topic topic, final T highlightValue, final OaBrokerMainEntity source, public UpdateInfo(final Topic topic, final T highlightValue, final OaBrokerMainEntity source,
final OaBrokerMainEntity target, final OaBrokerMainEntity target,
final OaBrokerRelatedDatasource targetDs,
final BiConsumer<OaBrokerMainEntity, T> compileHighlight, final BiConsumer<OaBrokerMainEntity, T> compileHighlight,
final Function<T, String> highlightToString) { final Function<T, String> highlightToString) {
this.topic = topic; this.topic = topic;
this.highlightValue = highlightValue; this.highlightValue = highlightValue;
this.source = source; this.source = source;
this.target = target; this.target = target;
this.targetDs = targetDs;
this.compileHighlight = compileHighlight; this.compileHighlight = compileHighlight;
this.highlightToString = highlightToString; this.highlightToString = highlightToString;
this.trust = TrustUtils.calculateTrust(source, target); this.trust = TrustUtils.calculateTrust(source, target);
@ -51,6 +56,10 @@ public final class UpdateInfo<T> {
return target; return target;
} }
public OaBrokerRelatedDatasource getTargetDs() {
return targetDs;
}
protected Topic getTopic() { protected Topic getTopic() {
return topic; return topic;
} }
@ -75,8 +84,20 @@ public final class UpdateInfo<T> {
compileHighlight.accept(hl, getHighlightValue()); compileHighlight.accept(hl, getHighlightValue());
final String provId = getSource().getOpenaireId(); final String provId = getSource().getOpenaireId();
final String provRepo = getSource().getCollectedFromName(); final String provRepo = getSource()
final String provType = getSource().getCollectedFromType(); .getDatasources()
.stream()
.filter(ds -> ds.getRelType().equals(BrokerConstants.COLLECTED_FROM_REL))
.map(ds -> ds.getName())
.findFirst()
.orElse("");
final String provType = getSource()
.getDatasources()
.stream()
.filter(ds -> ds.getRelType().equals(BrokerConstants.COLLECTED_FROM_REL))
.map(ds -> ds.getType())
.findFirst()
.orElse("");
final String provUrl = getSource() final String provUrl = getSource()
.getInstances() .getInstances()

View File

@ -0,0 +1,42 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import java.io.Serializable;
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
public class RelatedDatasource implements Serializable {
/**
*
*/
private static final long serialVersionUID = 3015550240920424010L;
private String source;
private OaBrokerRelatedDatasource relDatasource;
public RelatedDatasource() {
}
public RelatedDatasource(final String source, final OaBrokerRelatedDatasource relDatasource) {
this.source = source;
this.relDatasource = relDatasource;
}
public String getSource() {
return source;
}
public void setSource(final String source) {
this.source = source;
}
public OaBrokerRelatedDatasource getRelDatasource() {
return relDatasource;
}
public void setRelDatasource(final OaBrokerRelatedDatasource relDatasource) {
this.relDatasource = relDatasource;
}
}

View File

@ -7,15 +7,16 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator; import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import scala.Tuple2; import scala.Tuple2;
public class AddDatasourceTypeAggregator public class RelatedDatasourceAggregator
extends Aggregator<Tuple2<OaBrokerMainEntity, SimpleDatasourceInfo>, OaBrokerMainEntity, OaBrokerMainEntity> { extends Aggregator<Tuple2<OaBrokerMainEntity, RelatedDatasource>, OaBrokerMainEntity, OaBrokerMainEntity> {
/** /**
* *
*/ */
private static final long serialVersionUID = 8788588975496014728L; private static final long serialVersionUID = -7212121913834713672L;
@Override @Override
public OaBrokerMainEntity zero() { public OaBrokerMainEntity zero() {
@ -29,10 +30,10 @@ public class AddDatasourceTypeAggregator
@Override @Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, public OaBrokerMainEntity reduce(final OaBrokerMainEntity g,
final Tuple2<OaBrokerMainEntity, SimpleDatasourceInfo> t) { final Tuple2<OaBrokerMainEntity, RelatedDatasource> t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1; final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
if (t._2 != null && StringUtils.isNotBlank(t._2.getType())) { if (t._2 != null && res.getDatasources().size() < BrokerConstants.MAX_NUMBER_OF_RELS) {
res.setCollectedFromType(t._2.getType()); res.getDatasources().add(t._2.getRelDatasource());
} }
return res; return res;
@ -40,7 +41,15 @@ public class AddDatasourceTypeAggregator
@Override @Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) { public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (StringUtils.isNotBlank(g1.getOpenaireId()) && StringUtils.isNotBlank(g1.getCollectedFromType())) { if (StringUtils.isNotBlank(g1.getOpenaireId())) {
final int availables = BrokerConstants.MAX_NUMBER_OF_RELS - g1.getDatasources().size();
if (availables > 0) {
if (g2.getDatasources().size() <= availables) {
g1.getDatasources().addAll(g2.getDatasources());
} else {
g1.getDatasources().addAll(g2.getDatasources().subList(0, availables));
}
}
return g1; return g1;
} else { } else {
return g2; return g2;
@ -56,4 +65,5 @@ public class AddDatasourceTypeAggregator
public Encoder<OaBrokerMainEntity> outputEncoder() { public Encoder<OaBrokerMainEntity> outputEncoder() {
return Encoders.bean(OaBrokerMainEntity.class); return Encoders.bean(OaBrokerMainEntity.class);
} }
} }

View File

@ -1,40 +0,0 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import java.io.Serializable;
public class SimpleDatasourceInfo implements Serializable {
/**
*
*/
private static final long serialVersionUID = 2996609859416024734L;
private String id;
private String type;
public SimpleDatasourceInfo() {
}
public SimpleDatasourceInfo(final String id, final String type) {
this.id = id;
this.type = type;
}
public String getId() {
return id;
}
public void setId(final String id) {
this.id = id;
}
public String getType() {
return type;
}
public void setType(final String type) {
this.type = type;
}
}

View File

@ -448,6 +448,30 @@
<arg>--index</arg><arg>${esIndexName}</arg> <arg>--index</arg><arg>${esIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg> <arg>--esHost</arg><arg>${esIndexHost}</arg>
</spark> </spark>
<ok to="stats"/>
<error to="Kill"/>
</action>
<action name="stats">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateStatsJob</name>
<class>eu.dnetlib.dhp.broker.oa.GenerateStatsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>

View File

@ -64,14 +64,209 @@
</configuration> </configuration>
</global> </global>
<start to="stats"/> <start to="join_entities_step0"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="join_entities_step0">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep0</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep0Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step1"/>
<error to="Kill"/>
</action>
<action name="join_entities_step1">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep1</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep1Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step2"/>
<error to="Kill"/>
</action>
<action name="join_entities_step2">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep2</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep2Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step3"/>
<error to="Kill"/>
</action>
<action name="join_entities_step3">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep3</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep3Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities_step4"/>
<error to="Kill"/>
</action>
<action name="join_entities_step4">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinStep4</name>
<class>eu.dnetlib.dhp.broker.oa.JoinStep4Job</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="prepare_groups"/>
<error to="Kill"/>
</action>
<action name="stats"> <action name="prepare_groups">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareGroupsJob</name>
<class>eu.dnetlib.dhp.broker.oa.PrepareGroupsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="generate_events"/>
<error to="Kill"/>
</action>
<action name="generate_events">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateEventsJob</name>
<class>eu.dnetlib.dhp.broker.oa.GenerateEventsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--datasourceIdWhitelist</arg><arg>${datasourceIdWhitelist}</arg>
<arg>--datasourceTypeWhitelist</arg><arg>${datasourceTypeWhitelist}</arg>
<arg>--datasourceIdBlacklist</arg><arg>${datasourceIdBlacklist}</arg>
</spark>
<ok to="index_es"/>
<error to="Kill"/>
</action>
<action name="index_es">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>IndexOnESJob</name>
<class>eu.dnetlib.dhp.broker.oa.IndexOnESJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.dynamicAllocation.maxExecutors="8"
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--index</arg><arg>${esIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg>
</spark>
<ok to="stats"/>
<error to="Kill"/>
</action>
<action name="stats">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>

View File

@ -8,15 +8,23 @@ import java.util.Collection;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerRelatedDatasource;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
@ExtendWith(MockitoExtension.class)
class UpdateMatcherTest { class UpdateMatcherTest {
UpdateMatcher<String> matcher = new EnrichMissingPublicationDate(); UpdateMatcher<String> matcher = new EnrichMissingPublicationDate();
@Mock
private OaBrokerRelatedDatasource targetDs;
@BeforeEach @BeforeEach
void setUp() throws Exception { void setUp() throws Exception {
} }
@ -30,7 +38,7 @@ class UpdateMatcherTest {
final OaBrokerMainEntity p4 = new OaBrokerMainEntity(); final OaBrokerMainEntity p4 = new OaBrokerMainEntity();
final Collection<UpdateInfo<String>> list = matcher final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty()); assertTrue(list.isEmpty());
} }
@ -46,7 +54,7 @@ class UpdateMatcherTest {
res.setPublicationdate("2018"); res.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty()); assertTrue(list.isEmpty());
} }
@ -62,7 +70,7 @@ class UpdateMatcherTest {
p2.setPublicationdate("2018"); p2.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.size() == 1); assertTrue(list.size() == 1);
} }
@ -79,7 +87,7 @@ class UpdateMatcherTest {
p2.setPublicationdate("2018"); p2.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty()); assertTrue(list.isEmpty());
} }
@ -98,7 +106,7 @@ class UpdateMatcherTest {
p4.setPublicationdate("2018"); p4.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.isEmpty()); assertTrue(list.isEmpty());
} }
@ -117,7 +125,7 @@ class UpdateMatcherTest {
p4.setPublicationdate("2018"); p4.setPublicationdate("2018");
final Collection<UpdateInfo<String>> list = matcher final Collection<UpdateInfo<String>> list = matcher
.searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); .searchUpdatesForRecord(res, targetDs, Arrays.asList(p1, p2, p3, p4), null);
assertTrue(list.size() == 1); assertTrue(list.size() == 1);
} }

View File

@ -624,6 +624,6 @@
<mockito-core.version>3.3.3</mockito-core.version> <mockito-core.version>3.3.3</mockito-core.version>
<mongodb.driver.version>3.4.2</mongodb.driver.version> <mongodb.driver.version>3.4.2</mongodb.driver.version>
<vtd.version>[2.12,3.0)</vtd.version> <vtd.version>[2.12,3.0)</vtd.version>
<dnet.openaire.broker.common>3.0.0</dnet.openaire.broker.common> <dnet.openaire.broker.common>3.1.0</dnet.openaire.broker.common>
</properties> </properties>
</project> </project>