forked from D-Net/dnet-hadoop
results with all joined entities
This commit is contained in:
parent
99f88e1cb8
commit
04fdcacd83
|
@ -37,7 +37,7 @@ public class EventFactory {
|
|||
final Map<String, Object> map = createMapFromResult(updateInfo);
|
||||
|
||||
final String eventId = calculateEventId(
|
||||
updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0),
|
||||
updateInfo.getTopicPath(), updateInfo.getTarget().getResult().getOriginalId().get(0),
|
||||
updateInfo.getHighlightValueAsString());
|
||||
|
||||
res.setEventId(eventId);
|
||||
|
@ -54,8 +54,8 @@ public class EventFactory {
|
|||
private static Map<String, Object> createMapFromResult(final UpdateInfo<?> updateInfo) {
|
||||
final Map<String, Object> map = new HashMap<>();
|
||||
|
||||
final Result source = updateInfo.getSource();
|
||||
final Result target = updateInfo.getTarget();
|
||||
final Result source = updateInfo.getSource().getResult();
|
||||
final Result target = updateInfo.getTarget().getResult();
|
||||
|
||||
final List<KeyValue> collectedFrom = target.getCollectedfrom();
|
||||
if (collectedFrom.size() == 1) {
|
||||
|
|
|
@ -3,15 +3,9 @@ package eu.dnetlib.dhp.broker.oa;
|
|||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
|
@ -26,38 +20,16 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.broker.model.Event;
|
||||
import eu.dnetlib.dhp.broker.model.EventFactory;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsRelatedTo;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedBy;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedTo;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetReferences;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMissingProject;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMoreProject;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsReferencedBy;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsRelatedTo;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedBy;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedTo;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationReferences;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMissingSoftware;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMoreSoftware;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAbstract;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAuthorOrcid;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingOpenAccess;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPid;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSubject;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject;
|
||||
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||
import eu.dnetlib.dhp.broker.oa.util.EventFinder;
|
||||
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ResultAggregator;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ResultGroup;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelationsAggregator;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
|
@ -72,40 +44,6 @@ public class GenerateEventsApplication {
|
|||
|
||||
private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class);
|
||||
|
||||
// Simple Matchers
|
||||
private static final UpdateMatcher<Result, ?> enrichMissingAbstract = new EnrichMissingAbstract();
|
||||
private static final UpdateMatcher<Result, ?> enrichMissingAuthorOrcid = new EnrichMissingAuthorOrcid();
|
||||
private static final UpdateMatcher<Result, ?> enrichMissingOpenAccess = new EnrichMissingOpenAccess();
|
||||
private static final UpdateMatcher<Result, ?> enrichMissingPid = new EnrichMissingPid();
|
||||
private static final UpdateMatcher<Result, ?> enrichMissingPublicationDate = new EnrichMissingPublicationDate();
|
||||
private static final UpdateMatcher<Result, ?> enrichMissingSubject = new EnrichMissingSubject();
|
||||
private static final UpdateMatcher<Result, ?> enrichMoreOpenAccess = new EnrichMoreOpenAccess();
|
||||
private static final UpdateMatcher<Result, ?> enrichMorePid = new EnrichMorePid();
|
||||
private static final UpdateMatcher<Result, ?> enrichMoreSubject = new EnrichMoreSubject();
|
||||
|
||||
// Advanced matchers
|
||||
private static final UpdateMatcher<Pair<Result, List<Project>>, ?> enrichMissingProject = new EnrichMissingProject();
|
||||
private static final UpdateMatcher<Pair<Result, List<Project>>, ?> enrichMoreProject = new EnrichMoreProject();
|
||||
|
||||
private static final UpdateMatcher<Pair<Result, List<Software>>, ?> enrichMissingSoftware = new EnrichMissingSoftware();
|
||||
private static final UpdateMatcher<Pair<Result, List<Software>>, ?> enrichMoreSoftware = new EnrichMoreSoftware();
|
||||
|
||||
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMisissingPublicationIsRelatedTo = new EnrichMissingPublicationIsRelatedTo();
|
||||
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsReferencedBy = new EnrichMissingPublicationIsReferencedBy();
|
||||
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationReferences = new EnrichMissingPublicationReferences();
|
||||
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsSupplementedTo = new EnrichMissingPublicationIsSupplementedTo();
|
||||
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsSupplementedBy = new EnrichMissingPublicationIsSupplementedBy();
|
||||
|
||||
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMisissingDatasetIsRelatedTo = new EnrichMissingDatasetIsRelatedTo();
|
||||
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsReferencedBy = new EnrichMissingDatasetIsReferencedBy();
|
||||
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetReferences = new EnrichMissingDatasetReferences();
|
||||
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsSupplementedTo = new EnrichMissingDatasetIsSupplementedTo();
|
||||
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsSupplementedBy = new EnrichMissingDatasetIsSupplementedBy();
|
||||
|
||||
// Aggregators
|
||||
private static final TypedColumn<Tuple2<Result, Relation>, ResultGroup> resultAggrTypedColumn = new ResultAggregator()
|
||||
.toColumn();
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
|
@ -145,14 +83,10 @@ public class GenerateEventsApplication {
|
|||
final Dataset<Event> all = spark.emptyDataset(Encoders.kryo(Event.class));
|
||||
|
||||
for (final Class<? extends Result> r1 : BrokerConstants.RESULT_CLASSES) {
|
||||
all.union(generateSimpleEvents(spark, graphPath, r1, dedupConfig));
|
||||
|
||||
for (final Class<? extends Result> r2 : BrokerConstants.RESULT_CLASSES) {
|
||||
all.union(generateRelationEvents(spark, graphPath, r1, r2, dedupConfig));
|
||||
}
|
||||
all.union(generateEvents(spark, graphPath, r1, dedupConfig));
|
||||
}
|
||||
|
||||
all.write().mode(SaveMode.Overwrite).json(eventsPath);
|
||||
all.write().mode(SaveMode.Overwrite).option("compression", "gzip").json(eventsPath);
|
||||
});
|
||||
|
||||
}
|
||||
|
@ -161,203 +95,83 @@ public class GenerateEventsApplication {
|
|||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||
}
|
||||
|
||||
private static <R extends Result> Dataset<Event> generateSimpleEvents(final SparkSession spark,
|
||||
private static <SRC extends Result> Dataset<Event> generateEvents(
|
||||
final SparkSession spark,
|
||||
final String graphPath,
|
||||
final Class<R> resultClazz,
|
||||
final Class<SRC> sourceClass,
|
||||
final DedupConfig dedupConfig) {
|
||||
|
||||
final Dataset<Result> results = readPath(
|
||||
spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), Result.class)
|
||||
.filter(r -> r.getDataInfo().getDeletedbyinference());
|
||||
final Dataset<ResultWithRelations> results = expandResultsWithRelations(spark, graphPath, sourceClass);
|
||||
|
||||
final Dataset<Relation> mergedRels = readPath(spark, graphPath + "/relation", Relation.class)
|
||||
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
|
||||
|
||||
final TypedColumn<Tuple2<ResultWithRelations, Relation>, ResultGroup> aggr = new ResultAggregator()
|
||||
.toColumn();
|
||||
|
||||
return results
|
||||
.joinWith(mergedRels, results.col("id").equalTo(mergedRels.col("source")), "inner")
|
||||
.groupByKey((MapFunction<Tuple2<Result, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
|
||||
.agg(resultAggrTypedColumn)
|
||||
.joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner")
|
||||
.groupByKey(
|
||||
(MapFunction<Tuple2<ResultWithRelations, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
|
||||
.agg(aggr)
|
||||
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class))
|
||||
.filter(ResultGroup::isValid)
|
||||
.map(
|
||||
(MapFunction<ResultGroup, EventGroup>) g -> GenerateEventsApplication
|
||||
.generateSimpleEvents(g, dedupConfig),
|
||||
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig),
|
||||
Encoders.kryo(EventGroup.class))
|
||||
.flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class));
|
||||
}
|
||||
|
||||
private static EventGroup generateSimpleEvents(final ResultGroup results, final DedupConfig dedupConfig) {
|
||||
final List<UpdateInfo<?>> list = new ArrayList<>();
|
||||
|
||||
for (final Result target : results.getData()) {
|
||||
list.addAll(enrichMissingAbstract.searchUpdatesForRecord(target, results.getData(), dedupConfig));
|
||||
list.addAll(enrichMissingAuthorOrcid.searchUpdatesForRecord(target, results.getData(), dedupConfig));
|
||||
list.addAll(enrichMissingOpenAccess.searchUpdatesForRecord(target, results.getData(), dedupConfig));
|
||||
list.addAll(enrichMissingPid.searchUpdatesForRecord(target, results.getData(), dedupConfig));
|
||||
list.addAll(enrichMissingPublicationDate.searchUpdatesForRecord(target, results.getData(), dedupConfig));
|
||||
list.addAll(enrichMissingSubject.searchUpdatesForRecord(target, results.getData(), dedupConfig));
|
||||
list.addAll(enrichMoreOpenAccess.searchUpdatesForRecord(target, results.getData(), dedupConfig));
|
||||
list.addAll(enrichMorePid.searchUpdatesForRecord(target, results.getData(), dedupConfig));
|
||||
list.addAll(enrichMoreSubject.searchUpdatesForRecord(target, results.getData(), dedupConfig));
|
||||
}
|
||||
|
||||
final EventGroup events = new EventGroup();
|
||||
list.stream().map(EventFactory::newBrokerEvent).forEach(events::addElement);
|
||||
return events;
|
||||
}
|
||||
|
||||
private static <SRC extends Result, TRG extends OafEntity> Dataset<Event> generateRelationEvents(
|
||||
private static <SRC extends Result> Dataset<ResultWithRelations> expandResultsWithRelations(
|
||||
final SparkSession spark,
|
||||
final String graphPath,
|
||||
final Class<SRC> sourceClass,
|
||||
final Class<TRG> targetClass,
|
||||
final DedupConfig dedupConfig) {
|
||||
|
||||
final Dataset<Result> sources = readPath(
|
||||
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class)
|
||||
.filter(r -> r.getDataInfo().getDeletedbyinference());
|
||||
|
||||
final Dataset<TRG> targets = readPath(
|
||||
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), targetClass);
|
||||
|
||||
final Dataset<Relation> mergedRels = readPath(spark, graphPath + "/relation", Relation.class)
|
||||
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
|
||||
final Class<SRC> sourceClass) {
|
||||
final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class);
|
||||
final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath(
|
||||
spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
|
||||
final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class);
|
||||
final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class);
|
||||
|
||||
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
|
||||
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
|
||||
|
||||
final Dataset<ResultGroup> duplicates = sources
|
||||
.joinWith(mergedRels, sources.col("id").equalTo(rels.col("source")), "inner")
|
||||
.groupByKey((MapFunction<Tuple2<Result, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
|
||||
.agg(resultAggrTypedColumn)
|
||||
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class))
|
||||
.filter(ResultGroup::isValid);
|
||||
final Dataset<ResultWithRelations> r0 = readPath(
|
||||
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class)
|
||||
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
||||
.map(r -> new ResultWithRelations(r), Encoders.kryo(ResultWithRelations.class));
|
||||
final Dataset<ResultWithRelations> r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
|
||||
final Dataset<ResultWithRelations> r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class));
|
||||
final Dataset<ResultWithRelations> r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class));
|
||||
final Dataset<ResultWithRelations> r4 = join(
|
||||
r3, rels, relatedEntities(publications, rels, RelatedProject.class));
|
||||
;
|
||||
|
||||
if (targetClass == Project.class) {
|
||||
// TODO join using: generateProjectsEvents
|
||||
} else if (targetClass == Software.class) {
|
||||
// TODO join using: generateSoftwareEvents
|
||||
} else if (targetClass == Publication.class) {
|
||||
// TODO join using: generatePublicationRelatedEvents
|
||||
} else if (targetClass == eu.dnetlib.dhp.schema.oaf.Dataset.class) {
|
||||
// TODO join using: generateDatasetRelatedEvents
|
||||
}
|
||||
|
||||
return null;
|
||||
return r4;
|
||||
}
|
||||
|
||||
private List<Event> generateProjectsEvents(final Collection<Pair<Result, List<Project>>> childrenWithProjects,
|
||||
final DedupConfig dedupConfig) {
|
||||
final List<UpdateInfo<?>> list = new ArrayList<>();
|
||||
|
||||
for (final Pair<Result, List<Project>> target : childrenWithProjects) {
|
||||
list.addAll(enrichMissingProject.searchUpdatesForRecord(target, childrenWithProjects, dedupConfig));
|
||||
list.addAll(enrichMoreProject.searchUpdatesForRecord(target, childrenWithProjects, dedupConfig));
|
||||
}
|
||||
|
||||
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
|
||||
private static <T, RT> Dataset<RT> relatedEntities(final Dataset<T> targets, final Dataset<Relation> rels,
|
||||
final Class<RT> clazz) {
|
||||
return rels
|
||||
.joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner")
|
||||
.map(
|
||||
t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz),
|
||||
Encoders.kryo(clazz));
|
||||
}
|
||||
|
||||
private List<Event> generateSoftwareEvents(final Collection<Pair<Result, List<Software>>> childrenWithSoftwares,
|
||||
final DedupConfig dedupConfig) {
|
||||
final List<UpdateInfo<?>> list = new ArrayList<>();
|
||||
private static <T> Dataset<ResultWithRelations> join(final Dataset<ResultWithRelations> sources,
|
||||
final Dataset<Relation> rels,
|
||||
final Dataset<T> typedRels) {
|
||||
|
||||
for (final Pair<Result, List<Software>> target : childrenWithSoftwares) {
|
||||
list.addAll(enrichMissingSoftware.searchUpdatesForRecord(target, childrenWithSoftwares, dedupConfig));
|
||||
list.addAll(enrichMoreSoftware.searchUpdatesForRecord(target, childrenWithSoftwares, dedupConfig));
|
||||
}
|
||||
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private List<Event> generatePublicationRelatedEvents(final String relType,
|
||||
final Collection<Pair<Result, Map<String, List<Publication>>>> childrenWithRels,
|
||||
final DedupConfig dedupConfig) {
|
||||
|
||||
final List<UpdateInfo<?>> list = new ArrayList<>();
|
||||
|
||||
final List<Pair<Result, List<Publication>>> cleanedChildrens = childrenWithRels
|
||||
.stream()
|
||||
.filter(p -> p.getRight().containsKey(relType))
|
||||
.map(p -> Pair.of(p.getLeft(), p.getRight().get(relType)))
|
||||
.filter(p -> p.getRight().size() > 0)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
for (final Pair<Result, List<Publication>> target : cleanedChildrens) {
|
||||
if (relType.equals("isRelatedTo")) {
|
||||
list
|
||||
.addAll(
|
||||
enrichMisissingPublicationIsRelatedTo
|
||||
.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
|
||||
} else if (relType.equals("references")) {
|
||||
list
|
||||
.addAll(
|
||||
enrichMissingPublicationReferences
|
||||
.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
|
||||
} else if (relType.equals("isReferencedBy")) {
|
||||
list
|
||||
.addAll(
|
||||
enrichMissingPublicationIsReferencedBy
|
||||
.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
|
||||
} else if (relType.equals("isSupplementedTo")) {
|
||||
list
|
||||
.addAll(
|
||||
enrichMissingPublicationIsSupplementedTo
|
||||
.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
|
||||
} else if (relType.equals("isSupplementedBy")) {
|
||||
list
|
||||
.addAll(
|
||||
enrichMissingPublicationIsSupplementedBy
|
||||
.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
|
||||
}
|
||||
}
|
||||
|
||||
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
|
||||
|
||||
}
|
||||
|
||||
private List<Event> generateDatasetRelatedEvents(final String relType,
|
||||
final Collection<Pair<Result, Map<String, List<eu.dnetlib.dhp.schema.oaf.Dataset>>>> childrenWithRels,
|
||||
final DedupConfig dedupConfig) {
|
||||
|
||||
final List<UpdateInfo<?>> list = new ArrayList<>();
|
||||
|
||||
final List<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>> cleanedChildrens = childrenWithRels
|
||||
.stream()
|
||||
.filter(p -> p.getRight().containsKey(relType))
|
||||
.map(p -> Pair.of(p.getLeft(), p.getRight().get(relType)))
|
||||
.filter(p -> p.getRight().size() > 0)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
for (final Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>> target : cleanedChildrens) {
|
||||
if (relType.equals("isRelatedTo")) {
|
||||
list
|
||||
.addAll(
|
||||
enrichMisissingDatasetIsRelatedTo
|
||||
.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
|
||||
} else if (relType.equals("references")) {
|
||||
list
|
||||
.addAll(
|
||||
enrichMissingDatasetReferences.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
|
||||
} else if (relType.equals("isReferencedBy")) {
|
||||
list
|
||||
.addAll(
|
||||
enrichMissingDatasetIsReferencedBy
|
||||
.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
|
||||
} else if (relType.equals("isSupplementedTo")) {
|
||||
list
|
||||
.addAll(
|
||||
enrichMissingDatasetIsSupplementedTo
|
||||
.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
|
||||
} else if (relType.equals("isSupplementedBy")) {
|
||||
list
|
||||
.addAll(
|
||||
enrichMissingDatasetIsSupplementedBy
|
||||
.searchUpdatesForRecord(target, cleanedChildrens, dedupConfig));
|
||||
}
|
||||
}
|
||||
|
||||
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
|
||||
final TypedColumn<Tuple2<ResultWithRelations, T>, ResultWithRelations> aggr = new ResultWithRelationsAggregator<T>()
|
||||
.toColumn();
|
||||
;
|
||||
|
||||
return sources
|
||||
.joinWith(typedRels, sources.col("result.id").equalTo(rels.col("source")), "left_outer")
|
||||
.groupByKey(
|
||||
(MapFunction<Tuple2<ResultWithRelations, T>, String>) t -> t._1.getResult().getId(), Encoders.STRING())
|
||||
.agg(aggr)
|
||||
.map(t -> t._2, Encoders.kryo(ResultWithRelations.class));
|
||||
}
|
||||
|
||||
public static <R> Dataset<R> readPath(
|
||||
|
@ -386,7 +200,6 @@ public class GenerateEventsApplication {
|
|||
// dedupConfig.getWf().setConfigurationId("???");
|
||||
|
||||
return dedupConfig;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -11,10 +11,11 @@ import org.apache.commons.codec.digest.DigestUtils;
|
|||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public abstract class UpdateMatcher<K, T> {
|
||||
public abstract class UpdateMatcher<T> {
|
||||
|
||||
private final boolean multipleUpdate;
|
||||
|
||||
|
@ -22,12 +23,13 @@ public abstract class UpdateMatcher<K, T> {
|
|||
this.multipleUpdate = multipleUpdate;
|
||||
}
|
||||
|
||||
public Collection<UpdateInfo<T>> searchUpdatesForRecord(final K res, final Collection<K> others,
|
||||
public Collection<UpdateInfo<T>> searchUpdatesForRecord(final ResultWithRelations res,
|
||||
final Collection<ResultWithRelations> others,
|
||||
final DedupConfig dedupConfig) {
|
||||
|
||||
final Map<String, UpdateInfo<T>> infoMap = new HashMap<>();
|
||||
|
||||
for (final K source : others) {
|
||||
for (final ResultWithRelations source : others) {
|
||||
if (source != res) {
|
||||
for (final UpdateInfo<T> info : findUpdates(source, res, dedupConfig)) {
|
||||
final String s = DigestUtils.md5Hex(info.getHighlightValueAsString());
|
||||
|
@ -53,7 +55,8 @@ public abstract class UpdateMatcher<K, T> {
|
|||
}
|
||||
}
|
||||
|
||||
protected abstract List<UpdateInfo<T>> findUpdates(K source, K target, DedupConfig dedupConfig);
|
||||
protected abstract List<UpdateInfo<T>> findUpdates(ResultWithRelations source, ResultWithRelations target,
|
||||
DedupConfig dedupConfig);
|
||||
|
||||
protected static boolean isMissing(final List<Field<String>> list) {
|
||||
return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0).getValue());
|
||||
|
|
|
@ -5,18 +5,17 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public abstract class AbstractEnrichMissingDataset
|
||||
extends UpdateMatcher<Pair<Result, List<Dataset>>, eu.dnetlib.broker.objects.Dataset> {
|
||||
extends UpdateMatcher<eu.dnetlib.broker.objects.Dataset> {
|
||||
|
||||
private final Topic topic;
|
||||
|
||||
|
@ -25,21 +24,27 @@ public abstract class AbstractEnrichMissingDataset
|
|||
this.topic = topic;
|
||||
}
|
||||
|
||||
protected abstract boolean filterByType(String relType);
|
||||
|
||||
@Override
|
||||
protected final List<UpdateInfo<eu.dnetlib.broker.objects.Dataset>> findUpdates(
|
||||
final Pair<Result, List<Dataset>> source,
|
||||
final Pair<Result, List<Dataset>> target,
|
||||
final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
|
||||
final Set<String> existingDatasets = target
|
||||
.getRight()
|
||||
.getDatasets()
|
||||
.stream()
|
||||
.filter(rel -> filterByType(rel.getRelType()))
|
||||
.map(RelatedDataset::getRelDataset)
|
||||
.map(Dataset::getId)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
return source
|
||||
.getRight()
|
||||
.getDatasets()
|
||||
.stream()
|
||||
.filter(rel -> filterByType(rel.getRelType()))
|
||||
.map(RelatedDataset::getRelDataset)
|
||||
.filter(d -> !existingDatasets.contains(d.getId()))
|
||||
.map(ConversionUtils::oafDatasetToBrokerDataset)
|
||||
.map(i -> generateUpdateInfo(i, source, target, dedupConfig))
|
||||
|
@ -49,12 +54,12 @@ public abstract class AbstractEnrichMissingDataset
|
|||
|
||||
protected final UpdateInfo<eu.dnetlib.broker.objects.Dataset> generateUpdateInfo(
|
||||
final eu.dnetlib.broker.objects.Dataset highlightValue,
|
||||
final Pair<Result, List<Dataset>> source,
|
||||
final Pair<Result, List<Dataset>> target,
|
||||
final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
return new UpdateInfo<>(
|
||||
getTopic(),
|
||||
highlightValue, source.getLeft(), target.getLeft(),
|
||||
highlightValue, source, target,
|
||||
(p, rel) -> p.getDatasets().add(rel),
|
||||
rel -> rel.getInstances().get(0).getUrl(),
|
||||
dedupConfig);
|
||||
|
@ -63,4 +68,5 @@ public abstract class AbstractEnrichMissingDataset
|
|||
public Topic getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -9,4 +9,9 @@ public class EnrichMissingDatasetIsReferencedBy extends AbstractEnrichMissingDat
|
|||
super(Topic.ENRICH_MISSING_DATASET_IS_REFERENCED_BY);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean filterByType(final String relType) {
|
||||
return relType.equals("isReferencedBy");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -9,4 +9,9 @@ public class EnrichMissingDatasetIsRelatedTo extends AbstractEnrichMissingDatase
|
|||
super(Topic.ENRICH_MISSING_DATASET_IS_RELATED_TO);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean filterByType(final String relType) {
|
||||
return relType.equals("isRelatedTo");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -9,4 +9,9 @@ public class EnrichMissingDatasetIsSupplementedBy extends AbstractEnrichMissingD
|
|||
super(Topic.ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_BY);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean filterByType(final String relType) {
|
||||
return relType.equals("isSupplementedBy");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -9,4 +9,9 @@ public class EnrichMissingDatasetIsSupplementedTo extends AbstractEnrichMissingD
|
|||
super(Topic.ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_TO);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean filterByType(final String relType) {
|
||||
return relType.equals("isSupplementedTo");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -9,4 +9,9 @@ public class EnrichMissingDatasetReferences extends AbstractEnrichMissingDataset
|
|||
super(Topic.ENRICH_MISSING_DATASET_REFERENCES);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean filterByType(final String relType) {
|
||||
return relType.equals("references");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -5,34 +5,33 @@ import java.util.Arrays;
|
|||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public class EnrichMissingProject
|
||||
extends UpdateMatcher<Pair<Result, List<Project>>, eu.dnetlib.broker.objects.Project> {
|
||||
extends UpdateMatcher<eu.dnetlib.broker.objects.Project> {
|
||||
|
||||
public EnrichMissingProject() {
|
||||
super(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<UpdateInfo<eu.dnetlib.broker.objects.Project>> findUpdates(final Pair<Result, List<Project>> source,
|
||||
final Pair<Result, List<Project>> target,
|
||||
protected List<UpdateInfo<eu.dnetlib.broker.objects.Project>> findUpdates(final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
|
||||
if (source.getRight().isEmpty()) {
|
||||
if (source.getProjects().isEmpty()) {
|
||||
return Arrays.asList();
|
||||
} else {
|
||||
return target
|
||||
.getRight()
|
||||
.getProjects()
|
||||
.stream()
|
||||
.map(RelatedProject::getRelProject)
|
||||
.map(ConversionUtils::oafProjectToBrokerProject)
|
||||
.map(p -> generateUpdateInfo(p, source, target, dedupConfig))
|
||||
.collect(Collectors.toList());
|
||||
|
@ -41,12 +40,12 @@ public class EnrichMissingProject
|
|||
|
||||
public UpdateInfo<eu.dnetlib.broker.objects.Project> generateUpdateInfo(
|
||||
final eu.dnetlib.broker.objects.Project highlightValue,
|
||||
final Pair<Result, List<Project>> source,
|
||||
final Pair<Result, List<Project>> target,
|
||||
final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
return new UpdateInfo<>(
|
||||
Topic.ENRICH_MISSING_PROJECT,
|
||||
highlightValue, source.getLeft(), target.getLeft(),
|
||||
highlightValue, source, target,
|
||||
(p, prj) -> p.getProjects().add(prj),
|
||||
prj -> prj.getFunder() + "::" + prj.getFundingProgram() + prj.getCode(), dedupConfig);
|
||||
}
|
||||
|
|
|
@ -5,36 +5,37 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public class EnrichMoreProject extends UpdateMatcher<Pair<Result, List<Project>>, eu.dnetlib.broker.objects.Project> {
|
||||
public class EnrichMoreProject extends UpdateMatcher<eu.dnetlib.broker.objects.Project> {
|
||||
|
||||
public EnrichMoreProject() {
|
||||
super(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<UpdateInfo<eu.dnetlib.broker.objects.Project>> findUpdates(final Pair<Result, List<Project>> source,
|
||||
final Pair<Result, List<Project>> target,
|
||||
protected List<UpdateInfo<eu.dnetlib.broker.objects.Project>> findUpdates(final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
|
||||
final Set<String> existingProjects = source
|
||||
.getRight()
|
||||
.getProjects()
|
||||
.stream()
|
||||
.map(RelatedProject::getRelProject)
|
||||
.map(Project::getId)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
return target
|
||||
.getRight()
|
||||
.getProjects()
|
||||
.stream()
|
||||
.map(RelatedProject::getRelProject)
|
||||
.filter(p -> !existingProjects.contains(p.getId()))
|
||||
.map(ConversionUtils::oafProjectToBrokerProject)
|
||||
.map(p -> generateUpdateInfo(p, source, target, dedupConfig))
|
||||
|
@ -43,12 +44,12 @@ public class EnrichMoreProject extends UpdateMatcher<Pair<Result, List<Project>>
|
|||
|
||||
public UpdateInfo<eu.dnetlib.broker.objects.Project> generateUpdateInfo(
|
||||
final eu.dnetlib.broker.objects.Project highlightValue,
|
||||
final Pair<Result, List<Project>> source,
|
||||
final Pair<Result, List<Project>> target,
|
||||
final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
return new UpdateInfo<>(
|
||||
Topic.ENRICH_MORE_PROJECT,
|
||||
highlightValue, source.getLeft(), target.getLeft(),
|
||||
highlightValue, source, target,
|
||||
(p, prj) -> p.getProjects().add(prj),
|
||||
prj -> prj.getFunder() + "::" + prj.getFundingProgram() + prj.getCode(), dedupConfig);
|
||||
}
|
||||
|
|
|
@ -5,18 +5,17 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public abstract class AbstractEnrichMissingPublication
|
||||
extends UpdateMatcher<Pair<Result, List<Publication>>, eu.dnetlib.broker.objects.Publication> {
|
||||
extends UpdateMatcher<eu.dnetlib.broker.objects.Publication> {
|
||||
|
||||
private final Topic topic;
|
||||
|
||||
|
@ -25,21 +24,27 @@ public abstract class AbstractEnrichMissingPublication
|
|||
this.topic = topic;
|
||||
}
|
||||
|
||||
protected abstract boolean filterByType(String relType);
|
||||
|
||||
@Override
|
||||
protected final List<UpdateInfo<eu.dnetlib.broker.objects.Publication>> findUpdates(
|
||||
final Pair<Result, List<Publication>> source,
|
||||
final Pair<Result, List<Publication>> target,
|
||||
final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
|
||||
final Set<String> existingPublications = target
|
||||
.getRight()
|
||||
.getPublications()
|
||||
.stream()
|
||||
.filter(rel -> filterByType(rel.getRelType()))
|
||||
.map(RelatedPublication::getRelPublication)
|
||||
.map(Publication::getId)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
return source
|
||||
.getRight()
|
||||
.getPublications()
|
||||
.stream()
|
||||
.filter(rel -> filterByType(rel.getRelType()))
|
||||
.map(RelatedPublication::getRelPublication)
|
||||
.filter(d -> !existingPublications.contains(d.getId()))
|
||||
.map(ConversionUtils::oafResultToBrokerPublication)
|
||||
.map(i -> generateUpdateInfo(i, source, target, dedupConfig))
|
||||
|
@ -49,12 +54,12 @@ public abstract class AbstractEnrichMissingPublication
|
|||
|
||||
protected final UpdateInfo<eu.dnetlib.broker.objects.Publication> generateUpdateInfo(
|
||||
final eu.dnetlib.broker.objects.Publication highlightValue,
|
||||
final Pair<Result, List<Publication>> source,
|
||||
final Pair<Result, List<Publication>> target,
|
||||
final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
return new UpdateInfo<>(
|
||||
getTopic(),
|
||||
highlightValue, source.getLeft(), target.getLeft(),
|
||||
highlightValue, source, target,
|
||||
(p, rel) -> p.getPublications().add(rel),
|
||||
rel -> rel.getInstances().get(0).getUrl(), dedupConfig);
|
||||
}
|
||||
|
|
|
@ -9,4 +9,8 @@ public class EnrichMissingPublicationIsReferencedBy extends AbstractEnrichMissin
|
|||
super(Topic.ENRICH_MISSING_PUBLICATION_IS_REFERENCED_BY);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean filterByType(final String relType) {
|
||||
return relType.equals("isReferencedBy");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,4 +9,9 @@ public class EnrichMissingPublicationIsRelatedTo extends AbstractEnrichMissingPu
|
|||
super(Topic.ENRICH_MISSING_PUBLICATION_IS_RELATED_TO);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean filterByType(final String relType) {
|
||||
return relType.equals("isRelatedTo");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -9,4 +9,8 @@ public class EnrichMissingPublicationIsSupplementedBy extends AbstractEnrichMiss
|
|||
super(Topic.ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_BY);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean filterByType(final String relType) {
|
||||
return relType.equals("isSupplementedBy");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,4 +9,9 @@ public class EnrichMissingPublicationIsSupplementedTo extends AbstractEnrichMiss
|
|||
super(Topic.ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_TO);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean filterByType(final String relType) {
|
||||
return relType.equals("isSupplementedTo");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -9,4 +9,9 @@ public class EnrichMissingPublicationReferences extends AbstractEnrichMissingPub
|
|||
super(Topic.ENRICH_MISSING_PUBLICATION_REFERENCES);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean filterByType(final String relType) {
|
||||
return relType.equals("references");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -5,18 +5,16 @@ import java.util.Arrays;
|
|||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public class EnrichMissingSoftware
|
||||
extends UpdateMatcher<Pair<Result, List<Software>>, eu.dnetlib.broker.objects.Software> {
|
||||
extends UpdateMatcher<eu.dnetlib.broker.objects.Software> {
|
||||
|
||||
public EnrichMissingSoftware() {
|
||||
super(true);
|
||||
|
@ -24,16 +22,17 @@ public class EnrichMissingSoftware
|
|||
|
||||
@Override
|
||||
protected List<UpdateInfo<eu.dnetlib.broker.objects.Software>> findUpdates(
|
||||
final Pair<Result, List<Software>> source,
|
||||
final Pair<Result, List<Software>> target,
|
||||
final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
|
||||
if (source.getRight().isEmpty()) {
|
||||
if (source.getSoftwares().isEmpty()) {
|
||||
return Arrays.asList();
|
||||
} else {
|
||||
return target
|
||||
.getRight()
|
||||
.getSoftwares()
|
||||
.stream()
|
||||
.map(RelatedSoftware::getRelSoftware)
|
||||
.map(ConversionUtils::oafSoftwareToBrokerSoftware)
|
||||
.map(p -> generateUpdateInfo(p, source, target, dedupConfig))
|
||||
.collect(Collectors.toList());
|
||||
|
@ -42,12 +41,12 @@ public class EnrichMissingSoftware
|
|||
|
||||
public UpdateInfo<eu.dnetlib.broker.objects.Software> generateUpdateInfo(
|
||||
final eu.dnetlib.broker.objects.Software highlightValue,
|
||||
final Pair<Result, List<Software>> source,
|
||||
final Pair<Result, List<Software>> target,
|
||||
final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
return new UpdateInfo<>(
|
||||
Topic.ENRICH_MISSING_SOFTWARE,
|
||||
highlightValue, source.getLeft(), target.getLeft(),
|
||||
highlightValue, source, target,
|
||||
(p, s) -> p.getSoftwares().add(s),
|
||||
s -> s.getName(), dedupConfig);
|
||||
}
|
||||
|
|
|
@ -5,18 +5,17 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public class EnrichMoreSoftware
|
||||
extends UpdateMatcher<Pair<Result, List<Software>>, eu.dnetlib.broker.objects.Software> {
|
||||
extends UpdateMatcher<eu.dnetlib.broker.objects.Software> {
|
||||
|
||||
public EnrichMoreSoftware() {
|
||||
super(true);
|
||||
|
@ -24,19 +23,21 @@ public class EnrichMoreSoftware
|
|||
|
||||
@Override
|
||||
protected List<UpdateInfo<eu.dnetlib.broker.objects.Software>> findUpdates(
|
||||
final Pair<Result, List<Software>> source,
|
||||
final Pair<Result, List<Software>> target,
|
||||
final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
|
||||
final Set<String> existingSoftwares = source
|
||||
.getRight()
|
||||
.getSoftwares()
|
||||
.stream()
|
||||
.map(RelatedSoftware::getRelSoftware)
|
||||
.map(Software::getId)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
return target
|
||||
.getRight()
|
||||
.getSoftwares()
|
||||
.stream()
|
||||
.map(RelatedSoftware::getRelSoftware)
|
||||
.filter(p -> !existingSoftwares.contains(p.getId()))
|
||||
.map(ConversionUtils::oafSoftwareToBrokerSoftware)
|
||||
.map(p -> generateUpdateInfo(p, source, target, dedupConfig))
|
||||
|
@ -45,12 +46,12 @@ public class EnrichMoreSoftware
|
|||
|
||||
public UpdateInfo<eu.dnetlib.broker.objects.Software> generateUpdateInfo(
|
||||
final eu.dnetlib.broker.objects.Software highlightValue,
|
||||
final Pair<Result, List<Software>> source,
|
||||
final Pair<Result, List<Software>> target,
|
||||
final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
return new UpdateInfo<>(
|
||||
Topic.ENRICH_MORE_SOFTWARE,
|
||||
highlightValue, source.getLeft(), target.getLeft(),
|
||||
highlightValue, source, target,
|
||||
(p, s) -> p.getSoftwares().add(s),
|
||||
s -> s.getName(), dedupConfig);
|
||||
}
|
||||
|
|
|
@ -8,28 +8,31 @@ import java.util.List;
|
|||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public class EnrichMissingAbstract extends UpdateMatcher<Result, String> {
|
||||
public class EnrichMissingAbstract extends UpdateMatcher<String> {
|
||||
|
||||
public EnrichMissingAbstract() {
|
||||
super(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<UpdateInfo<String>> findUpdates(final Result source, final Result target,
|
||||
protected List<UpdateInfo<String>> findUpdates(final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
if (isMissing(target.getDescription()) && !isMissing(source.getDescription())) {
|
||||
if (isMissing(target.getResult().getDescription()) && !isMissing(source.getResult().getDescription())) {
|
||||
return Arrays
|
||||
.asList(generateUpdateInfo(source.getDescription().get(0).getValue(), source, target, dedupConfig));
|
||||
.asList(
|
||||
generateUpdateInfo(
|
||||
source.getResult().getDescription().get(0).getValue(), source, target, dedupConfig));
|
||||
}
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
public UpdateInfo<String> generateUpdateInfo(final String highlightValue,
|
||||
final Result source,
|
||||
final Result target,
|
||||
final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
return new UpdateInfo<>(
|
||||
Topic.ENRICH_MISSING_ABSTRACT,
|
||||
|
|
|
@ -9,17 +9,18 @@ import org.apache.commons.lang3.tuple.Pair;
|
|||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public class EnrichMissingAuthorOrcid extends UpdateMatcher<Result, Pair<String, String>> {
|
||||
public class EnrichMissingAuthorOrcid extends UpdateMatcher<Pair<String, String>> {
|
||||
|
||||
public EnrichMissingAuthorOrcid() {
|
||||
super(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<UpdateInfo<Pair<String, String>>> findUpdates(final Result source, final Result target,
|
||||
protected List<UpdateInfo<Pair<String, String>>> findUpdates(final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
// TODO
|
||||
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
|
||||
|
@ -27,8 +28,8 @@ public class EnrichMissingAuthorOrcid extends UpdateMatcher<Result, Pair<String,
|
|||
}
|
||||
|
||||
public UpdateInfo<Pair<String, String>> generateUpdateInfo(final Pair<String, String> highlightValue,
|
||||
final Result source,
|
||||
final Result target,
|
||||
final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
return new UpdateInfo<>(
|
||||
Topic.ENRICH_MISSING_AUTHOR_ORCID,
|
||||
|
|
|
@ -11,19 +11,21 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
|||
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public class EnrichMissingOpenAccess extends UpdateMatcher<Result, Instance> {
|
||||
public class EnrichMissingOpenAccess extends UpdateMatcher<Instance> {
|
||||
|
||||
public EnrichMissingOpenAccess() {
|
||||
super(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<UpdateInfo<Instance>> findUpdates(final Result source, final Result target,
|
||||
protected List<UpdateInfo<Instance>> findUpdates(final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
final long count = target
|
||||
.getResult()
|
||||
.getInstance()
|
||||
.stream()
|
||||
.map(i -> i.getAccessright().getClassid())
|
||||
|
@ -35,6 +37,7 @@ public class EnrichMissingOpenAccess extends UpdateMatcher<Result, Instance> {
|
|||
}
|
||||
|
||||
return source
|
||||
.getResult()
|
||||
.getInstance()
|
||||
.stream()
|
||||
.filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
|
||||
|
@ -45,8 +48,8 @@ public class EnrichMissingOpenAccess extends UpdateMatcher<Result, Instance> {
|
|||
}
|
||||
|
||||
public UpdateInfo<Instance> generateUpdateInfo(final Instance highlightValue,
|
||||
final Result source,
|
||||
final Result target,
|
||||
final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
return new UpdateInfo<>(
|
||||
Topic.ENRICH_MISSING_OA_VERSION,
|
||||
|
|
|
@ -10,25 +10,27 @@ import eu.dnetlib.dhp.broker.model.Topic;
|
|||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public class EnrichMissingPid extends UpdateMatcher<Result, Pid> {
|
||||
public class EnrichMissingPid extends UpdateMatcher<Pid> {
|
||||
|
||||
public EnrichMissingPid() {
|
||||
super(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<UpdateInfo<Pid>> findUpdates(final Result source, final Result target,
|
||||
protected List<UpdateInfo<Pid>> findUpdates(final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
final long count = target.getPid().size();
|
||||
final long count = target.getResult().getPid().size();
|
||||
|
||||
if (count > 0) {
|
||||
return Arrays.asList();
|
||||
}
|
||||
|
||||
return source
|
||||
.getResult()
|
||||
.getPid()
|
||||
.stream()
|
||||
.map(ConversionUtils::oafPidToBrokerPid)
|
||||
|
@ -36,7 +38,9 @@ public class EnrichMissingPid extends UpdateMatcher<Result, Pid> {
|
|||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public UpdateInfo<Pid> generateUpdateInfo(final Pid highlightValue, final Result source, final Result target,
|
||||
public UpdateInfo<Pid> generateUpdateInfo(final Pid highlightValue,
|
||||
final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
return new UpdateInfo<>(
|
||||
Topic.ENRICH_MISSING_PID,
|
||||
|
|
|
@ -8,28 +8,32 @@ import java.util.List;
|
|||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public class EnrichMissingPublicationDate extends UpdateMatcher<Result, String> {
|
||||
public class EnrichMissingPublicationDate extends UpdateMatcher<String> {
|
||||
|
||||
public EnrichMissingPublicationDate() {
|
||||
super(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<UpdateInfo<String>> findUpdates(final Result source, final Result target,
|
||||
protected List<UpdateInfo<String>> findUpdates(final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
if (isMissing(target.getDateofacceptance()) && !isMissing(source.getDateofacceptance())) {
|
||||
if (isMissing(target.getResult().getDateofacceptance())
|
||||
&& !isMissing(source.getResult().getDateofacceptance())) {
|
||||
return Arrays
|
||||
.asList(generateUpdateInfo(source.getDateofacceptance().getValue(), source, target, dedupConfig));
|
||||
.asList(
|
||||
generateUpdateInfo(
|
||||
source.getResult().getDateofacceptance().getValue(), source, target, dedupConfig));
|
||||
}
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
public UpdateInfo<String> generateUpdateInfo(final String highlightValue,
|
||||
final Result source,
|
||||
final Result target,
|
||||
final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
return new UpdateInfo<>(
|
||||
Topic.ENRICH_MISSING_PUBLICATION_DATE,
|
||||
|
|
|
@ -11,21 +11,23 @@ import eu.dnetlib.dhp.broker.model.Topic;
|
|||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public class EnrichMissingSubject extends UpdateMatcher<Result, Pair<String, String>> {
|
||||
public class EnrichMissingSubject extends UpdateMatcher<Pair<String, String>> {
|
||||
|
||||
public EnrichMissingSubject() {
|
||||
super(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<UpdateInfo<Pair<String, String>>> findUpdates(final Result source, final Result target,
|
||||
protected List<UpdateInfo<Pair<String, String>>> findUpdates(final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
final Set<String> existingTypes = target
|
||||
.getResult()
|
||||
.getSubject()
|
||||
.stream()
|
||||
.map(StructuredProperty::getQualifier)
|
||||
|
@ -33,6 +35,7 @@ public class EnrichMissingSubject extends UpdateMatcher<Result, Pair<String, Str
|
|||
.collect(Collectors.toSet());
|
||||
|
||||
return source
|
||||
.getResult()
|
||||
.getPid()
|
||||
.stream()
|
||||
.filter(pid -> !existingTypes.contains(pid.getQualifier().getClassid()))
|
||||
|
@ -42,8 +45,8 @@ public class EnrichMissingSubject extends UpdateMatcher<Result, Pair<String, Str
|
|||
}
|
||||
|
||||
public UpdateInfo<Pair<String, String>> generateUpdateInfo(final Pair<String, String> highlightValue,
|
||||
final Result source,
|
||||
final Result target,
|
||||
final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
|
||||
return new UpdateInfo<>(
|
||||
|
|
|
@ -11,19 +11,21 @@ import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
|||
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public class EnrichMoreOpenAccess extends UpdateMatcher<Result, Instance> {
|
||||
public class EnrichMoreOpenAccess extends UpdateMatcher<Instance> {
|
||||
|
||||
public EnrichMoreOpenAccess() {
|
||||
super(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<UpdateInfo<Instance>> findUpdates(final Result source, final Result target,
|
||||
protected List<UpdateInfo<Instance>> findUpdates(final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
final Set<String> urls = target
|
||||
.getResult()
|
||||
.getInstance()
|
||||
.stream()
|
||||
.filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
|
||||
|
@ -32,6 +34,7 @@ public class EnrichMoreOpenAccess extends UpdateMatcher<Result, Instance> {
|
|||
.collect(Collectors.toSet());
|
||||
|
||||
return source
|
||||
.getResult()
|
||||
.getInstance()
|
||||
.stream()
|
||||
.filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
|
||||
|
@ -43,8 +46,8 @@ public class EnrichMoreOpenAccess extends UpdateMatcher<Result, Instance> {
|
|||
}
|
||||
|
||||
public UpdateInfo<Instance> generateUpdateInfo(final Instance highlightValue,
|
||||
final Result source,
|
||||
final Result target,
|
||||
final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
return new UpdateInfo<>(
|
||||
Topic.ENRICH_MORE_OA_VERSION,
|
||||
|
|
|
@ -10,25 +10,28 @@ import eu.dnetlib.dhp.broker.model.Topic;
|
|||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public class EnrichMorePid extends UpdateMatcher<Result, Pid> {
|
||||
public class EnrichMorePid extends UpdateMatcher<Pid> {
|
||||
|
||||
public EnrichMorePid() {
|
||||
super(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<UpdateInfo<Pid>> findUpdates(final Result source, final Result target,
|
||||
protected List<UpdateInfo<Pid>> findUpdates(final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
final Set<String> existingPids = target
|
||||
.getResult()
|
||||
.getPid()
|
||||
.stream()
|
||||
.map(pid -> pid.getQualifier().getClassid() + "::" + pid.getValue())
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
return source
|
||||
.getResult()
|
||||
.getPid()
|
||||
.stream()
|
||||
.filter(pid -> !existingPids.contains(pid.getQualifier().getClassid() + "::" + pid.getValue()))
|
||||
|
@ -37,7 +40,9 @@ public class EnrichMorePid extends UpdateMatcher<Result, Pid> {
|
|||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public UpdateInfo<Pid> generateUpdateInfo(final Pid highlightValue, final Result source, final Result target,
|
||||
public UpdateInfo<Pid> generateUpdateInfo(final Pid highlightValue,
|
||||
final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
return new UpdateInfo<>(
|
||||
Topic.ENRICH_MORE_PID,
|
||||
|
|
|
@ -11,25 +11,28 @@ import eu.dnetlib.dhp.broker.model.Topic;
|
|||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public class EnrichMoreSubject extends UpdateMatcher<Result, Pair<String, String>> {
|
||||
public class EnrichMoreSubject extends UpdateMatcher<Pair<String, String>> {
|
||||
|
||||
public EnrichMoreSubject() {
|
||||
super(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<UpdateInfo<Pair<String, String>>> findUpdates(final Result source, final Result target,
|
||||
protected List<UpdateInfo<Pair<String, String>>> findUpdates(final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
final Set<String> existingSubjects = target
|
||||
.getResult()
|
||||
.getSubject()
|
||||
.stream()
|
||||
.map(pid -> pid.getQualifier().getClassid() + "::" + pid.getValue())
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
return source
|
||||
.getResult()
|
||||
.getPid()
|
||||
.stream()
|
||||
.filter(pid -> !existingSubjects.contains(pid.getQualifier().getClassid() + "::" + pid.getValue()))
|
||||
|
@ -39,8 +42,8 @@ public class EnrichMoreSubject extends UpdateMatcher<Result, Pair<String, String
|
|||
}
|
||||
|
||||
public UpdateInfo<Pair<String, String>> generateUpdateInfo(final Pair<String, String> highlightValue,
|
||||
final Result source,
|
||||
final Result target,
|
||||
final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final DedupConfig dedupConfig) {
|
||||
|
||||
return new UpdateInfo<>(
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa.util;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import eu.dnetlib.dhp.broker.model.EventFactory;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsRelatedTo;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedBy;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedTo;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetReferences;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMissingProject;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMoreProject;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsReferencedBy;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsRelatedTo;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedBy;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedTo;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationReferences;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMissingSoftware;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMoreSoftware;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAbstract;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAuthorOrcid;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingOpenAccess;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPid;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSubject;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid;
|
||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public class EventFinder {
|
||||
|
||||
private static List<UpdateMatcher<?>> matchers = new ArrayList<>();
|
||||
static {
|
||||
matchers.add(new EnrichMissingAbstract());
|
||||
matchers.add(new EnrichMissingAuthorOrcid());
|
||||
matchers.add(new EnrichMissingOpenAccess());
|
||||
matchers.add(new EnrichMissingPid());
|
||||
matchers.add(new EnrichMissingPublicationDate());
|
||||
matchers.add(new EnrichMissingSubject());
|
||||
matchers.add(new EnrichMoreOpenAccess());
|
||||
matchers.add(new EnrichMorePid());
|
||||
matchers.add(new EnrichMoreSubject());
|
||||
|
||||
// Advanced matchers
|
||||
matchers.add(new EnrichMissingProject());
|
||||
matchers.add(new EnrichMoreProject());
|
||||
matchers.add(new EnrichMissingSoftware());
|
||||
matchers.add(new EnrichMoreSoftware());
|
||||
matchers.add(new EnrichMissingPublicationIsRelatedTo());
|
||||
matchers.add(new EnrichMissingPublicationIsReferencedBy());
|
||||
matchers.add(new EnrichMissingPublicationReferences());
|
||||
matchers.add(new EnrichMissingPublicationIsSupplementedTo());
|
||||
matchers.add(new EnrichMissingPublicationIsSupplementedBy());
|
||||
matchers.add(new EnrichMissingDatasetIsRelatedTo());
|
||||
matchers.add(new EnrichMissingDatasetIsReferencedBy());
|
||||
matchers.add(new EnrichMissingDatasetReferences());
|
||||
matchers.add(new EnrichMissingDatasetIsSupplementedTo());
|
||||
matchers.add(new EnrichMissingDatasetIsSupplementedBy());
|
||||
matchers.add(new EnrichMissingAbstract());
|
||||
}
|
||||
|
||||
public static EventGroup generateEvents(final ResultGroup results, final DedupConfig dedupConfig) {
|
||||
final List<UpdateInfo<?>> list = new ArrayList<>();
|
||||
|
||||
for (final ResultWithRelations target : results.getData()) {
|
||||
for (final UpdateMatcher<?> matcher : matchers) {
|
||||
list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig));
|
||||
}
|
||||
}
|
||||
|
||||
return asEventGroup(list);
|
||||
}
|
||||
|
||||
private static EventGroup asEventGroup(final List<UpdateInfo<?>> list) {
|
||||
final EventGroup events = new EventGroup();
|
||||
list.stream().map(EventFactory::newBrokerEvent).forEach(events::addElement);
|
||||
return events;
|
||||
}
|
||||
|
||||
}
|
|
@ -14,6 +14,7 @@ import eu.dnetlib.broker.objects.OpenAireEventPayload;
|
|||
import eu.dnetlib.broker.objects.Provenance;
|
||||
import eu.dnetlib.broker.objects.Publication;
|
||||
import eu.dnetlib.dhp.broker.model.Topic;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.dhp.schema.oaf.Instance;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
@ -28,9 +29,9 @@ public final class UpdateInfo<T> {
|
|||
|
||||
private final T highlightValue;
|
||||
|
||||
private final Result source;
|
||||
private final ResultWithRelations source;
|
||||
|
||||
private final Result target;
|
||||
private final ResultWithRelations target;
|
||||
|
||||
private final BiConsumer<Publication, T> compileHighlight;
|
||||
|
||||
|
@ -40,7 +41,8 @@ public final class UpdateInfo<T> {
|
|||
|
||||
private static final Logger log = LoggerFactory.getLogger(UpdateInfo.class);
|
||||
|
||||
public UpdateInfo(final Topic topic, final T highlightValue, final Result source, final Result target,
|
||||
public UpdateInfo(final Topic topic, final T highlightValue, final ResultWithRelations source,
|
||||
final ResultWithRelations target,
|
||||
final BiConsumer<Publication, T> compileHighlight,
|
||||
final Function<T, String> highlightToString,
|
||||
final DedupConfig dedupConfig) {
|
||||
|
@ -50,18 +52,18 @@ public final class UpdateInfo<T> {
|
|||
this.target = target;
|
||||
this.compileHighlight = compileHighlight;
|
||||
this.highlightToString = highlightToString;
|
||||
this.trust = calculateTrust(dedupConfig, source, target);
|
||||
this.trust = calculateTrust(dedupConfig, source.getResult(), target.getResult());
|
||||
}
|
||||
|
||||
public T getHighlightValue() {
|
||||
return highlightValue;
|
||||
}
|
||||
|
||||
public Result getSource() {
|
||||
public ResultWithRelations getSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
public Result getTarget() {
|
||||
public ResultWithRelations getTarget() {
|
||||
return target;
|
||||
}
|
||||
|
||||
|
@ -101,20 +103,22 @@ public final class UpdateInfo<T> {
|
|||
|
||||
public OpenAireEventPayload asBrokerPayload() {
|
||||
|
||||
final Publication p = ConversionUtils.oafResultToBrokerPublication(getSource());
|
||||
final Publication p = ConversionUtils.oafResultToBrokerPublication(getSource().getResult());
|
||||
compileHighlight.accept(p, getHighlightValue());
|
||||
|
||||
final Publication hl = new Publication();
|
||||
compileHighlight.accept(hl, getHighlightValue());
|
||||
|
||||
final String provId = getSource().getOriginalId().stream().findFirst().orElse(null);
|
||||
final String provId = getSource().getResult().getOriginalId().stream().findFirst().orElse(null);
|
||||
final String provRepo = getSource()
|
||||
.getResult()
|
||||
.getCollectedfrom()
|
||||
.stream()
|
||||
.map(KeyValue::getValue)
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
final String provUrl = getSource()
|
||||
.getResult()
|
||||
.getInstance()
|
||||
.stream()
|
||||
.map(Instance::getUrl)
|
||||
|
|
|
@ -1,15 +1,15 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa.util;
|
||||
package eu.dnetlib.dhp.broker.oa.util.aggregators.simple;
|
||||
|
||||
import org.apache.spark.sql.Encoder;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.expressions.Aggregator;
|
||||
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class ResultAggregator extends Aggregator<Tuple2<Result, Relation>, ResultGroup, ResultGroup> {
|
||||
public class ResultAggregator extends Aggregator<Tuple2<ResultWithRelations, Relation>, ResultGroup, ResultGroup> {
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -22,7 +22,7 @@ public class ResultAggregator extends Aggregator<Tuple2<Result, Relation>, Resul
|
|||
}
|
||||
|
||||
@Override
|
||||
public ResultGroup reduce(final ResultGroup group, final Tuple2<Result, Relation> t) {
|
||||
public ResultGroup reduce(final ResultGroup group, final Tuple2<ResultWithRelations, Relation> t) {
|
||||
return group.addElement(t._1);
|
||||
}
|
||||
|
|
@ -1,11 +1,11 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa.util;
|
||||
package eu.dnetlib.dhp.broker.oa.util.aggregators.simple;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations;
|
||||
|
||||
public class ResultGroup implements Serializable {
|
||||
|
||||
|
@ -14,13 +14,13 @@ public class ResultGroup implements Serializable {
|
|||
*/
|
||||
private static final long serialVersionUID = -3360828477088669296L;
|
||||
|
||||
private final List<Result> data = new ArrayList<>();
|
||||
private final List<ResultWithRelations> data = new ArrayList<>();
|
||||
|
||||
public List<Result> getData() {
|
||||
public List<ResultWithRelations> getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
public ResultGroup addElement(final Result elem) {
|
||||
public ResultGroup addElement(final ResultWithRelations elem) {
|
||||
data.add(elem);
|
||||
return this;
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
|
||||
public class RelatedDataset {
|
||||
|
||||
private final String source;
|
||||
private final String relType;
|
||||
private final Dataset relDataset;
|
||||
|
||||
public RelatedDataset(final String source, final String relType, final Dataset relDataset) {
|
||||
this.source = source;
|
||||
this.relType = relType;
|
||||
this.relDataset = relDataset;
|
||||
}
|
||||
|
||||
public String getSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
public String getRelType() {
|
||||
return relType;
|
||||
}
|
||||
|
||||
public Dataset getRelDataset() {
|
||||
return relDataset;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
|
||||
public class RelatedEntityFactory {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <RT, T> RT newRelatedEntity(final String sourceId, final String relType, final T target,
|
||||
final Class<RT> clazz) {
|
||||
if (clazz == RelatedProject.class) {
|
||||
return (RT) new RelatedProject(sourceId, relType, (Project) target);
|
||||
}
|
||||
if (clazz == RelatedSoftware.class) {
|
||||
return (RT) new RelatedSoftware(sourceId, relType, (Software) target);
|
||||
}
|
||||
if (clazz == RelatedDataset.class) {
|
||||
return (RT) new RelatedDataset(sourceId, relType, (Dataset) target);
|
||||
}
|
||||
if (clazz == RelatedPublication.class) {
|
||||
return (RT) new RelatedPublication(sourceId, relType, (Publication) target);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
|
||||
public class RelatedProject {
|
||||
|
||||
private final String source;
|
||||
private final String relType;
|
||||
private final Project relProject;
|
||||
|
||||
public RelatedProject(final String source, final String relType, final Project relProject) {
|
||||
this.source = source;
|
||||
this.relType = relType;
|
||||
this.relProject = relProject;
|
||||
}
|
||||
|
||||
public String getSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
public String getRelType() {
|
||||
return relType;
|
||||
}
|
||||
|
||||
public Project getRelProject() {
|
||||
return relProject;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
|
||||
public class RelatedPublication {
|
||||
|
||||
private final String source;
|
||||
private final String relType;
|
||||
private final Publication relPublication;
|
||||
|
||||
public RelatedPublication(final String source, final String relType, final Publication relPublication) {
|
||||
this.source = source;
|
||||
this.relType = relType;
|
||||
this.relPublication = relPublication;
|
||||
}
|
||||
|
||||
public String getSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
public String getRelType() {
|
||||
return relType;
|
||||
}
|
||||
|
||||
public Publication getRelPublication() {
|
||||
return relPublication;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
|
||||
public class RelatedSoftware {
|
||||
|
||||
private final String source;
|
||||
private final String relType;
|
||||
private final Software relSoftware;
|
||||
|
||||
public RelatedSoftware(final String source, final String relType, final Software relSoftware) {
|
||||
this.source = source;
|
||||
this.relType = relType;
|
||||
this.relSoftware = relSoftware;
|
||||
}
|
||||
|
||||
public String getSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
public String getRelType() {
|
||||
return relType;
|
||||
}
|
||||
|
||||
public Software getRelSoftware() {
|
||||
return relSoftware;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
||||
public class ResultWithRelations implements Serializable {
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private static final long serialVersionUID = -1368401915974311571L;
|
||||
|
||||
private Result result;
|
||||
|
||||
private final List<RelatedDataset> datasets = new ArrayList<>();
|
||||
private final List<RelatedPublication> publications = new ArrayList<>();
|
||||
private final List<RelatedSoftware> softwares = new ArrayList<>();
|
||||
private final List<RelatedProject> projects = new ArrayList<>();
|
||||
|
||||
public ResultWithRelations() {
|
||||
}
|
||||
|
||||
public ResultWithRelations(final Result result) {
|
||||
this.result = result;
|
||||
}
|
||||
|
||||
public Result getResult() {
|
||||
return result;
|
||||
}
|
||||
|
||||
public List<RelatedDataset> getDatasets() {
|
||||
return datasets;
|
||||
}
|
||||
|
||||
public List<RelatedPublication> getPublications() {
|
||||
return publications;
|
||||
}
|
||||
|
||||
public List<RelatedSoftware> getSoftwares() {
|
||||
return softwares;
|
||||
}
|
||||
|
||||
public List<RelatedProject> getProjects() {
|
||||
return projects;
|
||||
}
|
||||
|
||||
public void setResult(final Result result) {
|
||||
this.result = result;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
|
||||
|
||||
import org.apache.spark.sql.Encoder;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.expressions.Aggregator;
|
||||
|
||||
import scala.Tuple2;
|
||||
|
||||
public class ResultWithRelationsAggregator<T>
|
||||
extends Aggregator<Tuple2<ResultWithRelations, T>, ResultWithRelations, ResultWithRelations> {
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private static final long serialVersionUID = -3687878788861013488L;
|
||||
|
||||
@Override
|
||||
public ResultWithRelations zero() {
|
||||
return new ResultWithRelations();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultWithRelations finish(final ResultWithRelations g) {
|
||||
return g;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultWithRelations reduce(final ResultWithRelations g, final Tuple2<ResultWithRelations, T> t) {
|
||||
if (g.getResult() == null) {
|
||||
return t._1;
|
||||
} else if (t._2 instanceof RelatedSoftware) {
|
||||
g.getSoftwares().add((RelatedSoftware) t._2);
|
||||
} else if (t._2 instanceof RelatedDataset) {
|
||||
g.getDatasets().add((RelatedDataset) t._2);
|
||||
} else if (t._2 instanceof RelatedPublication) {
|
||||
g.getPublications().add((RelatedPublication) t._2);
|
||||
} else if (t._2 instanceof RelatedProject) {
|
||||
g.getProjects().add((RelatedProject) t._2);
|
||||
}
|
||||
return g;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultWithRelations merge(final ResultWithRelations g1, final ResultWithRelations g2) {
|
||||
if (g1.getResult() != null) {
|
||||
g1.getSoftwares().addAll(g2.getSoftwares());
|
||||
g1.getDatasets().addAll(g2.getDatasets());
|
||||
g1.getPublications().addAll(g2.getPublications());
|
||||
g1.getProjects().addAll(g2.getProjects());
|
||||
return g1;
|
||||
} else {
|
||||
return g2;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Encoder<ResultWithRelations> bufferEncoder() {
|
||||
return Encoders.kryo(ResultWithRelations.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Encoder<ResultWithRelations> outputEncoder() {
|
||||
return Encoders.kryo(ResultWithRelations.class);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue