1
0
Fork 0

Merge branch 'master' of code-repo.d4science.org:D-Net/dnet-hadoop

This commit is contained in:
Miriam Baglioni 2020-05-22 13:53:23 +02:00
commit 70389b0a30
44 changed files with 1439 additions and 451 deletions

View File

@ -4,48 +4,45 @@ package eu.dnetlib.dhp.broker.model;
public enum Topic {
// ENRICHMENT MISSING
ENRICH_MISSING_OA_VERSION("ENRICH/MISSING/OPENACCESS_VERSION"),
ENRICH_MISSING_ABSTRACT("ENRICH/MISSING/ABSTRACT"),
ENRICH_MISSING_PUBLICATION_DATE("ENRICH/MISSING/PUBLICATION_DATE"),
ENRICH_MISSING_PID("ENRICH/MISSING/PID"),
ENRICH_MISSING_PROJECT("ENRICH/MISSING/PROJECT"),
ENRICH_MISSING_SOFTWARE("ENRICH/MISSING/SOFTWARE"),
ENRICH_MISSING_SUBJECT_MESHEUROPMC("ENRICH/MISSING/SUBJECT/MESHEUROPMC"),
ENRICH_MISSING_SUBJECT_ARXIV("ENRICH/MISSING/SUBJECT/ARXIV"),
ENRICH_MISSING_SUBJECT_JEL("ENRICH/MISSING/SUBJECT/JEL"),
ENRICH_MISSING_SUBJECT_DDC("ENRICH/MISSING/SUBJECT/DDC"),
ENRICH_MISSING_SUBJECT_ACM("ENRICH/MISSING/SUBJECT/ACM"),
ENRICH_MISSING_SUBJECT_RVK("ENRICH/MISSING/SUBJECT/RVK"),
ENRICH_MISSING_AUTHOR_ORCID("ENRICH/MISSING/AUTHOR/ORCID"),
ENRICH_MISSING_OA_VERSION("ENRICH/MISSING/OPENACCESS_VERSION"), ENRICH_MISSING_ABSTRACT(
"ENRICH/MISSING/ABSTRACT"), ENRICH_MISSING_PUBLICATION_DATE(
"ENRICH/MISSING/PUBLICATION_DATE"), ENRICH_MISSING_PID(
"ENRICH/MISSING/PID"), ENRICH_MISSING_PROJECT("ENRICH/MISSING/PROJECT"), ENRICH_MISSING_SOFTWARE(
"ENRICH/MISSING/SOFTWARE"), ENRICH_MISSING_SUBJECT_MESHEUROPMC(
"ENRICH/MISSING/SUBJECT/MESHEUROPMC"), ENRICH_MISSING_SUBJECT_ARXIV(
"ENRICH/MISSING/SUBJECT/ARXIV"), ENRICH_MISSING_SUBJECT_JEL(
"ENRICH/MISSING/SUBJECT/JEL"), ENRICH_MISSING_SUBJECT_DDC(
"ENRICH/MISSING/SUBJECT/DDC"), ENRICH_MISSING_SUBJECT_ACM(
"ENRICH/MISSING/SUBJECT/ACM"), ENRICH_MISSING_SUBJECT_RVK(
"ENRICH/MISSING/SUBJECT/RVK"), ENRICH_MISSING_AUTHOR_ORCID(
"ENRICH/MISSING/AUTHOR/ORCID"),
// ENRICHMENT MORE
ENRICH_MORE_PID("ENRICH/MORE/PID"),
ENRICH_MORE_OA_VERSION("ENRICH/MORE/OPENACCESS_VERSION"),
ENRICH_MORE_ABSTRACT("ENRICH/MORE/ABSTRACT"),
ENRICH_MORE_PUBLICATION_DATE("ENRICH/MORE/PUBLICATION_DATE"),
ENRICH_MORE_PROJECT("ENRICH/MORE/PROJECT"),
ENRICH_MORE_SUBJECT_MESHEUROPMC("ENRICH/MORE/SUBJECT/MESHEUROPMC"),
ENRICH_MORE_SUBJECT_ARXIV("ENRICH/MORE/SUBJECT/ARXIV"),
ENRICH_MORE_SUBJECT_JEL("ENRICH/MORE/SUBJECT/JEL"),
ENRICH_MORE_SUBJECT_DDC("ENRICH/MORE/SUBJECT/DDC"),
ENRICH_MORE_SUBJECT_ACM("ENRICH/MORE/SUBJECT/ACM"),
ENRICH_MORE_SUBJECT_RVK("ENRICH/MORE/SUBJECT/RVK"),
ENRICH_MORE_PID("ENRICH/MORE/PID"), ENRICH_MORE_OA_VERSION("ENRICH/MORE/OPENACCESS_VERSION"), ENRICH_MORE_ABSTRACT(
"ENRICH/MORE/ABSTRACT"), ENRICH_MORE_PUBLICATION_DATE("ENRICH/MORE/PUBLICATION_DATE"), ENRICH_MORE_PROJECT(
"ENRICH/MORE/PROJECT"), ENRICH_MORE_SOFTWARE("ENRICH/MORE/SOFTWARE"), ENRICH_MORE_SUBJECT_MESHEUROPMC(
"ENRICH/MORE/SUBJECT/MESHEUROPMC"), ENRICH_MORE_SUBJECT_ARXIV(
"ENRICH/MORE/SUBJECT/ARXIV"), ENRICH_MORE_SUBJECT_JEL(
"ENRICH/MORE/SUBJECT/JEL"), ENRICH_MORE_SUBJECT_DDC(
"ENRICH/MORE/SUBJECT/DDC"), ENRICH_MORE_SUBJECT_ACM(
"ENRICH/MORE/SUBJECT/ACM"), ENRICH_MORE_SUBJECT_RVK("ENRICH/MORE/SUBJECT/RVK"),
// ADDITION
ADD_BY_PROJECT("ADD/BY_PROJECT"),
// OTHER RELS
ENRICH_MISSING_PUBLICATION_IS_RELATED_TO("ENRICH/MISSING/PUBLICATION/IS_RELATED_TO"),
ENRICH_MISSING_PUBLICATION_REFERENCES("ENRICH/MISSING/PUBLICATION/REFERENCES"),
ENRICH_MISSING_PUBLICATION_IS_REFERENCED_BY("ENRICH/MISSING/PUBLICATION/IS_REFERENCED_BY"),
ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_TO("ENRICH/MISSING/PUBLICATION/IS_SUPPLEMENTED_TO"),
ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_BY("ENRICH/MISSING/PUBLICATION/IS_SUPPLEMENTED_BY"),
ENRICH_MISSING_PUBLICATION_IS_RELATED_TO(
"ENRICH/MISSING/PUBLICATION/IS_RELATED_TO"), ENRICH_MISSING_PUBLICATION_REFERENCES(
"ENRICH/MISSING/PUBLICATION/REFERENCES"), ENRICH_MISSING_PUBLICATION_IS_REFERENCED_BY(
"ENRICH/MISSING/PUBLICATION/IS_REFERENCED_BY"), ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_TO(
"ENRICH/MISSING/PUBLICATION/IS_SUPPLEMENTED_TO"), ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_BY(
"ENRICH/MISSING/PUBLICATION/IS_SUPPLEMENTED_BY"),
ENRICH_MISSING_DATASET_IS_RELATED_TO("ENRICH/MISSING/DATASET/IS_RELATED_TO"),
ENRICH_MISSING_DATASET_REFERENCES("ENRICH/MISSING/DATASET/REFERENCES"),
ENRICH_MISSING_DATASET_IS_REFERENCED_BY("ENRICH/MISSING/DATASET/IS_REFERENCED_BY"),
ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_TO("ENRICH/MISSING/DATASET/IS_SUPPLEMENTED_TO"),
ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_BY("ENRICH/MISSING/DATASET/IS_SUPPLEMENTED_BY");
ENRICH_MISSING_DATASET_IS_RELATED_TO("ENRICH/MISSING/DATASET/IS_RELATED_TO"), ENRICH_MISSING_DATASET_REFERENCES(
"ENRICH/MISSING/DATASET/REFERENCES"), ENRICH_MISSING_DATASET_IS_REFERENCED_BY(
"ENRICH/MISSING/DATASET/IS_REFERENCED_BY"), ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_TO(
"ENRICH/MISSING/DATASET/IS_SUPPLEMENTED_TO"), ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_BY(
"ENRICH/MISSING/DATASET/IS_SUPPLEMENTED_BY"),;
Topic(final String path) {
this.path = path;
@ -59,7 +56,9 @@ public enum Topic {
public static Topic fromPath(final String path) {
for (final Topic t : Topic.values()) {
if (t.getPath().equals(path)) { return t; }
if (t.getPath().equals(path)) {
return t;
}
}
return null;
}

View File

@ -4,11 +4,14 @@ package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
@ -29,18 +32,33 @@ import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.model.EventFactory;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingAbstract;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingAuthorOrcid;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingDatasetIsReferencedBy;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingDatasetIsRelatedTo;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingDatasetIsSupplementedBy;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingDatasetIsSupplementedTo;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingDatasetReferences;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingOpenAccess;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingPid;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingProject;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingPublicationDate;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingPublicationIsReferencedBy;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingPublicationIsRelatedTo;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingPublicationIsSupplementedBy;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingPublicationIsSupplementedTo;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingPublicationReferences;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingSoftware;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingSubject;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMoreOpenAccess;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMorePid;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMoreProject;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMoreSoftware;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMoreSubject;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
@ -50,24 +68,44 @@ public class GenerateEventsApplication {
private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class);
private static final UpdateMatcher<?> enrichMissingAbstract = new EnrichMissingAbstract();
private static final UpdateMatcher<?> enrichMissingAuthorOrcid = new EnrichMissingAuthorOrcid();
private static final UpdateMatcher<?> enrichMissingOpenAccess = new EnrichMissingOpenAccess();
private static final UpdateMatcher<?> enrichMissingPid = new EnrichMissingPid();
private static final UpdateMatcher<?> enrichMissingProject = new EnrichMissingProject();
private static final UpdateMatcher<?> enrichMissingPublicationDate = new EnrichMissingPublicationDate();
private static final UpdateMatcher<?> enrichMissingSubject = new EnrichMissingSubject();
private static final UpdateMatcher<?> enrichMoreOpenAccess = new EnrichMoreOpenAccess();
private static final UpdateMatcher<?> enrichMorePid = new EnrichMorePid();
private static final UpdateMatcher<?> enrichMoreSubject = new EnrichMoreSubject();
// Simple Matchers
private static final UpdateMatcher<Result, ?> enrichMissingAbstract = new EnrichMissingAbstract();
private static final UpdateMatcher<Result, ?> enrichMissingAuthorOrcid = new EnrichMissingAuthorOrcid();
private static final UpdateMatcher<Result, ?> enrichMissingOpenAccess = new EnrichMissingOpenAccess();
private static final UpdateMatcher<Result, ?> enrichMissingPid = new EnrichMissingPid();
private static final UpdateMatcher<Result, ?> enrichMissingPublicationDate = new EnrichMissingPublicationDate();
private static final UpdateMatcher<Result, ?> enrichMissingSubject = new EnrichMissingSubject();
private static final UpdateMatcher<Result, ?> enrichMoreOpenAccess = new EnrichMoreOpenAccess();
private static final UpdateMatcher<Result, ?> enrichMorePid = new EnrichMorePid();
private static final UpdateMatcher<Result, ?> enrichMoreSubject = new EnrichMoreSubject();
// Advanced matchers
private static final UpdateMatcher<Pair<Result, List<Project>>, ?> enrichMissingProject = new EnrichMissingProject();
private static final UpdateMatcher<Pair<Result, List<Project>>, ?> enrichMoreProject = new EnrichMoreProject();
private static final UpdateMatcher<Pair<Result, List<Software>>, ?> enrichMissingSoftware = new EnrichMissingSoftware();
private static final UpdateMatcher<Pair<Result, List<Software>>, ?> enrichMoreSoftware = new EnrichMoreSoftware();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMisissingPublicationIsRelatedTo = new EnrichMissingPublicationIsRelatedTo();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsReferencedBy = new EnrichMissingPublicationIsReferencedBy();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationReferences = new EnrichMissingPublicationReferences();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsSupplementedTo = new EnrichMissingPublicationIsSupplementedTo();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsSupplementedBy = new EnrichMissingPublicationIsSupplementedBy();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMisissingDatasetIsRelatedTo = new EnrichMissingDatasetIsRelatedTo();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsReferencedBy = new EnrichMissingDatasetIsReferencedBy();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetReferences = new EnrichMissingDatasetReferences();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsSupplementedTo = new EnrichMissingDatasetIsSupplementedTo();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsSupplementedBy = new EnrichMissingDatasetIsSupplementedBy();
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(GenerateEventsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json")));
.toString(
GenerateEventsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
@ -82,9 +120,6 @@ public class GenerateEventsApplication {
final String eventsPath = parser.get("eventsPath");
log.info("eventsPath: {}", eventsPath);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
@ -111,17 +146,17 @@ public class GenerateEventsApplication {
final String graphPath,
final Class<R> resultClazz) {
final Dataset<R> results =
readPath(spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), resultClazz)
final Dataset<R> results = readPath(
spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), resultClazz)
.filter(r -> r.getDataInfo().getDeletedbyinference());
final Dataset<Relation> rels =
readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelClass().equals("TODO")); // TODO mergedIN
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
final Column c = null; // TODO
final Dataset<Row> aa = results.joinWith(rels, results.col("id").equalTo(rels.col("source")), "inner")
final Dataset<Row> aa = results
.joinWith(rels, results.col("id").equalTo(rels.col("source")), "inner")
.groupBy(rels.col("target"))
.agg(c)
.filter(x -> x.size() > 1)
@ -134,7 +169,7 @@ public class GenerateEventsApplication {
}
private List<Event> generateSimpleEvents(final Result... children) {
private List<Event> generateSimpleEvents(final Collection<Result> children) {
final List<UpdateInfo<?>> list = new ArrayList<>();
for (final Result target : children) {
@ -142,7 +177,6 @@ public class GenerateEventsApplication {
list.addAll(enrichMissingAuthorOrcid.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingOpenAccess.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingPid.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingProject.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingPublicationDate.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingSubject.searchUpdatesForRecord(target, children));
list.addAll(enrichMoreOpenAccess.searchUpdatesForRecord(target, children));
@ -153,6 +187,87 @@ public class GenerateEventsApplication {
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
}
private List<Event> generateProjectsEvents(final Collection<Pair<Result, List<Project>>> childrenWithProjects) {
final List<UpdateInfo<?>> list = new ArrayList<>();
for (final Pair<Result, List<Project>> target : childrenWithProjects) {
list.addAll(enrichMissingProject.searchUpdatesForRecord(target, childrenWithProjects));
list.addAll(enrichMoreProject.searchUpdatesForRecord(target, childrenWithProjects));
}
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
}
private List<Event> generateSoftwareEvents(final Collection<Pair<Result, List<Software>>> childrenWithSoftwares) {
final List<UpdateInfo<?>> list = new ArrayList<>();
for (final Pair<Result, List<Software>> target : childrenWithSoftwares) {
list.addAll(enrichMissingSoftware.searchUpdatesForRecord(target, childrenWithSoftwares));
list.addAll(enrichMoreSoftware.searchUpdatesForRecord(target, childrenWithSoftwares));
}
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
}
private List<Event> generatePublicationRelatedEvents(final String relType,
final Collection<Pair<Result, Map<String, List<Publication>>>> childrenWithRels) {
final List<UpdateInfo<?>> list = new ArrayList<>();
final List<Pair<Result, List<Publication>>> cleanedChildrens = childrenWithRels
.stream()
.filter(p -> p.getRight().containsKey(relType))
.map(p -> Pair.of(p.getLeft(), p.getRight().get(relType)))
.filter(p -> p.getRight().size() > 0)
.collect(Collectors.toList());
for (final Pair<Result, List<Publication>> target : cleanedChildrens) {
if (relType.equals("isRelatedTo")) {
list.addAll(enrichMisissingPublicationIsRelatedTo.searchUpdatesForRecord(target, cleanedChildrens));
} else if (relType.equals("references")) {
list.addAll(enrichMissingPublicationReferences.searchUpdatesForRecord(target, cleanedChildrens));
} else if (relType.equals("isReferencedBy")) {
list.addAll(enrichMissingPublicationIsReferencedBy.searchUpdatesForRecord(target, cleanedChildrens));
} else if (relType.equals("isSupplementedTo")) {
list.addAll(enrichMissingPublicationIsSupplementedTo.searchUpdatesForRecord(target, cleanedChildrens));
} else if (relType.equals("isSupplementedBy")) {
list.addAll(enrichMissingPublicationIsSupplementedBy.searchUpdatesForRecord(target, cleanedChildrens));
}
}
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
}
private List<Event> generateDatasetRelatedEvents(final String relType,
final Collection<Pair<Result, Map<String, List<eu.dnetlib.dhp.schema.oaf.Dataset>>>> childrenWithRels) {
final List<UpdateInfo<?>> list = new ArrayList<>();
final List<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>> cleanedChildrens = childrenWithRels
.stream()
.filter(p -> p.getRight().containsKey(relType))
.map(p -> Pair.of(p.getLeft(), p.getRight().get(relType)))
.filter(p -> p.getRight().size() > 0)
.collect(Collectors.toList());
for (final Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>> target : cleanedChildrens) {
if (relType.equals("isRelatedTo")) {
list.addAll(enrichMisissingDatasetIsRelatedTo.searchUpdatesForRecord(target, cleanedChildrens));
} else if (relType.equals("references")) {
list.addAll(enrichMissingDatasetReferences.searchUpdatesForRecord(target, cleanedChildrens));
} else if (relType.equals("isReferencedBy")) {
list.addAll(enrichMissingDatasetIsReferencedBy.searchUpdatesForRecord(target, cleanedChildrens));
} else if (relType.equals("isSupplementedTo")) {
list.addAll(enrichMissingDatasetIsSupplementedTo.searchUpdatesForRecord(target, cleanedChildrens));
} else if (relType.equals("isSupplementedBy")) {
list.addAll(enrichMissingDatasetIsSupplementedBy.searchUpdatesForRecord(target, cleanedChildrens));
}
}
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
}
public static <R> Dataset<R> readPath(
final SparkSession spark,
final String inputPath,

View File

@ -9,7 +9,7 @@ import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingAbstract extends UpdateMatcher<String> {
public class EnrichMissingAbstract extends UpdateMatcher<Result, String> {
public EnrichMissingAbstract() {
super(false);
@ -24,7 +24,8 @@ public class EnrichMissingAbstract extends UpdateMatcher<String> {
}
@Override
public UpdateInfo<String> generateUpdateInfo(final String highlightValue, final Result source,
public UpdateInfo<String> generateUpdateInfo(final String highlightValue,
final Result source,
final Result target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_ABSTRACT,

View File

@ -10,7 +10,7 @@ import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingAuthorOrcid extends UpdateMatcher<Pair<String, String>> {
public class EnrichMissingAuthorOrcid extends UpdateMatcher<Result, Pair<String, String>> {
public EnrichMissingAuthorOrcid() {
super(true);
@ -24,7 +24,8 @@ public class EnrichMissingAuthorOrcid extends UpdateMatcher<Pair<String, String>
@Override
public UpdateInfo<Pair<String, String>> generateUpdateInfo(final Pair<String, String> highlightValue,
final Result source, final Result target) {
final Result source,
final Result target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_AUTHOR_ORCID,
highlightValue, source, target,

View File

@ -0,0 +1,38 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingDatasetIsReferencedBy
extends UpdateMatcher<Pair<Result, List<Dataset>>, eu.dnetlib.broker.objects.Dataset> {
public EnrichMissingDatasetIsReferencedBy() {
super(true);
}
@Override
protected List<UpdateInfo<eu.dnetlib.broker.objects.Dataset>> findUpdates(final Pair<Result, List<Dataset>> source,
final Pair<Result, List<Dataset>> target) {
// TODO Auto-generated method stub
return null;
}
@Override
protected UpdateInfo<eu.dnetlib.broker.objects.Dataset> generateUpdateInfo(
final eu.dnetlib.broker.objects.Dataset highlightValue,
final Pair<Result, List<Dataset>> source,
final Pair<Result, List<Dataset>> target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_DATASET_IS_REFERENCED_BY,
highlightValue, source.getLeft(), target.getLeft(),
(p, rel) -> p.getDatasets().add(rel),
rel -> rel.getInstances().get(0).getUrl());
}
}

View File

@ -0,0 +1,38 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingDatasetIsRelatedTo
extends UpdateMatcher<Pair<Result, List<Dataset>>, eu.dnetlib.broker.objects.Dataset> {
public EnrichMissingDatasetIsRelatedTo() {
super(true);
}
@Override
protected List<UpdateInfo<eu.dnetlib.broker.objects.Dataset>> findUpdates(final Pair<Result, List<Dataset>> source,
final Pair<Result, List<Dataset>> target) {
// TODO Auto-generated method stub
return null;
}
@Override
protected UpdateInfo<eu.dnetlib.broker.objects.Dataset> generateUpdateInfo(
final eu.dnetlib.broker.objects.Dataset highlightValue,
final Pair<Result, List<Dataset>> source,
final Pair<Result, List<Dataset>> target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_DATASET_IS_RELATED_TO,
highlightValue, source.getLeft(), target.getLeft(),
(p, rel) -> p.getDatasets().add(rel),
rel -> rel.getInstances().get(0).getUrl());
}
}

View File

@ -0,0 +1,38 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingDatasetIsSupplementedBy
extends UpdateMatcher<Pair<Result, List<Dataset>>, eu.dnetlib.broker.objects.Dataset> {
public EnrichMissingDatasetIsSupplementedBy() {
super(true);
}
@Override
protected List<UpdateInfo<eu.dnetlib.broker.objects.Dataset>> findUpdates(final Pair<Result, List<Dataset>> source,
final Pair<Result, List<Dataset>> target) {
// TODO Auto-generated method stub
return null;
}
@Override
protected UpdateInfo<eu.dnetlib.broker.objects.Dataset> generateUpdateInfo(
final eu.dnetlib.broker.objects.Dataset highlightValue,
final Pair<Result, List<Dataset>> source,
final Pair<Result, List<Dataset>> target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_BY,
highlightValue, source.getLeft(), target.getLeft(),
(p, rel) -> p.getDatasets().add(rel),
rel -> rel.getInstances().get(0).getUrl());
}
}

View File

@ -0,0 +1,38 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingDatasetIsSupplementedTo
extends UpdateMatcher<Pair<Result, List<Dataset>>, eu.dnetlib.broker.objects.Dataset> {
public EnrichMissingDatasetIsSupplementedTo() {
super(true);
}
@Override
protected List<UpdateInfo<eu.dnetlib.broker.objects.Dataset>> findUpdates(final Pair<Result, List<Dataset>> source,
final Pair<Result, List<Dataset>> target) {
// TODO Auto-generated method stub
return null;
}
@Override
protected UpdateInfo<eu.dnetlib.broker.objects.Dataset> generateUpdateInfo(
final eu.dnetlib.broker.objects.Dataset highlightValue,
final Pair<Result, List<Dataset>> source,
final Pair<Result, List<Dataset>> target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_TO,
highlightValue, source.getLeft(), target.getLeft(),
(p, rel) -> p.getDatasets().add(rel),
rel -> rel.getInstances().get(0).getUrl());
}
}

View File

@ -0,0 +1,38 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingDatasetReferences
extends UpdateMatcher<Pair<Result, List<Dataset>>, eu.dnetlib.broker.objects.Dataset> {
public EnrichMissingDatasetReferences() {
super(true);
}
@Override
protected List<UpdateInfo<eu.dnetlib.broker.objects.Dataset>> findUpdates(final Pair<Result, List<Dataset>> source,
final Pair<Result, List<Dataset>> target) {
// TODO Auto-generated method stub
return null;
}
@Override
protected UpdateInfo<eu.dnetlib.broker.objects.Dataset> generateUpdateInfo(
final eu.dnetlib.broker.objects.Dataset highlightValue,
final Pair<Result, List<Dataset>> source,
final Pair<Result, List<Dataset>> target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_DATASET_REFERENCES,
highlightValue, source.getLeft(), target.getLeft(),
(p, rel) -> p.getDatasets().add(rel),
rel -> rel.getInstances().get(0).getUrl());
}
}

View File

@ -12,7 +12,7 @@ import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingOpenAccess extends UpdateMatcher<Instance> {
public class EnrichMissingOpenAccess extends UpdateMatcher<Result, Instance> {
public EnrichMissingOpenAccess() {
super(true);

View File

@ -11,7 +11,7 @@ import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingPid extends UpdateMatcher<Pid> {
public class EnrichMissingPid extends UpdateMatcher<Result, Pid> {
public EnrichMissingPid() {
super(true);

View File

@ -4,30 +4,35 @@ package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.Project;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingProject extends UpdateMatcher<Project> {
public class EnrichMissingProject
extends UpdateMatcher<Pair<Result, List<Project>>, eu.dnetlib.broker.objects.Project> {
public EnrichMissingProject() {
super(true);
}
@Override
protected List<UpdateInfo<Project>> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
protected List<UpdateInfo<eu.dnetlib.broker.objects.Project>> findUpdates(final Pair<Result, List<Project>> source,
final Pair<Result, List<Project>> target) {
// TODO
return Arrays.asList();
}
@Override
public UpdateInfo<Project> generateUpdateInfo(final Project highlightValue,
final Result source,
final Result target) {
public UpdateInfo<eu.dnetlib.broker.objects.Project> generateUpdateInfo(
final eu.dnetlib.broker.objects.Project highlightValue,
final Pair<Result, List<Project>> source,
final Pair<Result, List<Project>> target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_PROJECT,
highlightValue, source, target,
highlightValue, source.getLeft(), target.getLeft(),
(p, prj) -> p.getProjects().add(prj),
prj -> prj.getFunder() + "::" + prj.getFundingProgram() + prj.getCode());
}

View File

@ -9,7 +9,7 @@ import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingPublicationDate extends UpdateMatcher<String> {
public class EnrichMissingPublicationDate extends UpdateMatcher<Result, String> {
public EnrichMissingPublicationDate() {
super(false);

View File

@ -0,0 +1,42 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingPublicationIsReferencedBy
extends UpdateMatcher<Pair<Result, List<Publication>>, eu.dnetlib.broker.objects.Publication> {
public EnrichMissingPublicationIsReferencedBy() {
super(true);
}
@Override
protected List<UpdateInfo<eu.dnetlib.broker.objects.Publication>> findUpdates(
final Pair<Result, List<Publication>> source,
final Pair<Result, List<Publication>> target) {
// TODO Auto-generated method stub
return Arrays.asList();
}
@Override
protected UpdateInfo<eu.dnetlib.broker.objects.Publication> generateUpdateInfo(
final eu.dnetlib.broker.objects.Publication highlightValue,
final Pair<Result, List<Publication>> source,
final Pair<Result, List<Publication>> target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_PUBLICATION_IS_REFERENCED_BY,
highlightValue, source.getLeft(), target.getLeft(),
(p, rel) -> {
}, // p.getPublications().add(rel), //TODO available in the future release of dnet-openaire-broker-common
rel -> rel.getOriginalId());
}
}

View File

@ -0,0 +1,42 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingPublicationIsRelatedTo
extends UpdateMatcher<Pair<Result, List<Publication>>, eu.dnetlib.broker.objects.Publication> {
public EnrichMissingPublicationIsRelatedTo() {
super(true);
}
@Override
protected List<UpdateInfo<eu.dnetlib.broker.objects.Publication>> findUpdates(
final Pair<Result, List<Publication>> source,
final Pair<Result, List<Publication>> target) {
// TODO Auto-generated method stub
return Arrays.asList();
}
@Override
protected UpdateInfo<eu.dnetlib.broker.objects.Publication> generateUpdateInfo(
final eu.dnetlib.broker.objects.Publication highlightValue,
final Pair<Result, List<Publication>> source,
final Pair<Result, List<Publication>> target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_PUBLICATION_IS_RELATED_TO,
highlightValue, source.getLeft(), target.getLeft(),
(p, rel) -> {
}, // p.getPublications().add(rel), //TODO available in the future release of dnet-openaire-broker-common
rel -> rel.getOriginalId());
}
}

View File

@ -0,0 +1,42 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingPublicationIsSupplementedBy
extends UpdateMatcher<Pair<Result, List<Publication>>, eu.dnetlib.broker.objects.Publication> {
public EnrichMissingPublicationIsSupplementedBy() {
super(true);
}
@Override
protected List<UpdateInfo<eu.dnetlib.broker.objects.Publication>> findUpdates(
final Pair<Result, List<Publication>> source,
final Pair<Result, List<Publication>> target) {
// TODO Auto-generated method stub
return Arrays.asList();
}
@Override
protected UpdateInfo<eu.dnetlib.broker.objects.Publication> generateUpdateInfo(
final eu.dnetlib.broker.objects.Publication highlightValue,
final Pair<Result, List<Publication>> source,
final Pair<Result, List<Publication>> target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_BY,
highlightValue, source.getLeft(), target.getLeft(),
(p, rel) -> {
}, // p.getPublications().add(rel), //TODO available in the future release of dnet-openaire-broker-common
rel -> rel.getOriginalId());
}
}

View File

@ -0,0 +1,42 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingPublicationIsSupplementedTo
extends UpdateMatcher<Pair<Result, List<Publication>>, eu.dnetlib.broker.objects.Publication> {
public EnrichMissingPublicationIsSupplementedTo() {
super(true);
}
@Override
protected List<UpdateInfo<eu.dnetlib.broker.objects.Publication>> findUpdates(
final Pair<Result, List<Publication>> source,
final Pair<Result, List<Publication>> target) {
// TODO Auto-generated method stub
return Arrays.asList();
}
@Override
protected UpdateInfo<eu.dnetlib.broker.objects.Publication> generateUpdateInfo(
final eu.dnetlib.broker.objects.Publication highlightValue,
final Pair<Result, List<Publication>> source,
final Pair<Result, List<Publication>> target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_TO,
highlightValue, source.getLeft(), target.getLeft(),
(p, rel) -> {
}, // p.getPublications().add(rel), //TODO available in the future release of dnet-openaire-broker-common
rel -> rel.getOriginalId());
}
}

View File

@ -0,0 +1,42 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingPublicationReferences
extends UpdateMatcher<Pair<Result, List<Publication>>, eu.dnetlib.broker.objects.Publication> {
public EnrichMissingPublicationReferences() {
super(true);
}
@Override
protected List<UpdateInfo<eu.dnetlib.broker.objects.Publication>> findUpdates(
final Pair<Result, List<Publication>> source,
final Pair<Result, List<Publication>> target) {
// TODO Auto-generated method stub
return Arrays.asList();
}
@Override
protected UpdateInfo<eu.dnetlib.broker.objects.Publication> generateUpdateInfo(
final eu.dnetlib.broker.objects.Publication highlightValue,
final Pair<Result, List<Publication>> source,
final Pair<Result, List<Publication>> target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_PUBLICATION_REFERENCES,
highlightValue, source.getLeft(), target.getLeft(),
(p, rel) -> {
}, // p.getPublications().add(rel), //TODO available in the future release of dnet-openaire-broker-common
rel -> rel.getOriginalId());
}
}

View File

@ -0,0 +1,41 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
public class EnrichMissingSoftware
extends UpdateMatcher<Pair<Result, List<Software>>, eu.dnetlib.broker.objects.Software> {
public EnrichMissingSoftware() {
super(true);
}
@Override
protected List<UpdateInfo<eu.dnetlib.broker.objects.Software>> findUpdates(
final Pair<Result, List<Software>> source,
final Pair<Result, List<Software>> target) {
// TODO
return Arrays.asList();
}
@Override
public UpdateInfo<eu.dnetlib.broker.objects.Software> generateUpdateInfo(
final eu.dnetlib.broker.objects.Software highlightValue,
final Pair<Result, List<Software>> source,
final Pair<Result, List<Software>> target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_SOFTWARE,
highlightValue, source.getLeft(), target.getLeft(),
(p, s) -> p.getSoftwares().add(s),
s -> s.getName());
}
}

View File

@ -14,7 +14,7 @@ import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class EnrichMissingSubject extends UpdateMatcher<Pair<String, String>> {
public class EnrichMissingSubject extends UpdateMatcher<Result, Pair<String, String>> {
public EnrichMissingSubject() {
super(true);

View File

@ -12,7 +12,7 @@ import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMoreOpenAccess extends UpdateMatcher<Instance> {
public class EnrichMoreOpenAccess extends UpdateMatcher<Result, Instance> {
public EnrichMoreOpenAccess() {
super(true);

View File

@ -11,7 +11,7 @@ import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMorePid extends UpdateMatcher<Pid> {
public class EnrichMorePid extends UpdateMatcher<Result, Pid> {
public EnrichMorePid() {
super(true);

View File

@ -0,0 +1,39 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMoreProject extends UpdateMatcher<Pair<Result, List<Project>>, eu.dnetlib.broker.objects.Project> {
public EnrichMoreProject() {
super(true);
}
@Override
protected List<UpdateInfo<eu.dnetlib.broker.objects.Project>> findUpdates(final Pair<Result, List<Project>> source,
final Pair<Result, List<Project>> target) {
// TODO
return Arrays.asList();
}
@Override
public UpdateInfo<eu.dnetlib.broker.objects.Project> generateUpdateInfo(
final eu.dnetlib.broker.objects.Project highlightValue,
final Pair<Result, List<Project>> source,
final Pair<Result, List<Project>> target) {
return new UpdateInfo<>(
Topic.ENRICH_MORE_PROJECT,
highlightValue, source.getLeft(), target.getLeft(),
(p, prj) -> p.getProjects().add(prj),
prj -> prj.getFunder() + "::" + prj.getFundingProgram() + prj.getCode());
}
}

View File

@ -0,0 +1,41 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
public class EnrichMoreSoftware
extends UpdateMatcher<Pair<Result, List<Software>>, eu.dnetlib.broker.objects.Software> {
public EnrichMoreSoftware() {
super(true);
}
@Override
protected List<UpdateInfo<eu.dnetlib.broker.objects.Software>> findUpdates(
final Pair<Result, List<Software>> source,
final Pair<Result, List<Software>> target) {
// TODO
return Arrays.asList();
}
@Override
public UpdateInfo<eu.dnetlib.broker.objects.Software> generateUpdateInfo(
final eu.dnetlib.broker.objects.Software highlightValue,
final Pair<Result, List<Software>> source,
final Pair<Result, List<Software>> target) {
return new UpdateInfo<>(
Topic.ENRICH_MORE_SOFTWARE,
highlightValue, source.getLeft(), target.getLeft(),
(p, s) -> p.getSoftwares().add(s),
s -> s.getName());
}
}

View File

@ -12,7 +12,7 @@ import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMoreSubject extends UpdateMatcher<Pair<String, String>> {
public class EnrichMoreSubject extends UpdateMatcher<Result, Pair<String, String>> {
public EnrichMoreSubject() {
super(true);

View File

@ -12,9 +12,8 @@ import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Result;
public abstract class UpdateMatcher<T> {
public abstract class UpdateMatcher<K, T> {
private final boolean multipleUpdate;
@ -22,15 +21,16 @@ public abstract class UpdateMatcher<T> {
this.multipleUpdate = multipleUpdate;
}
public Collection<UpdateInfo<T>> searchUpdatesForRecord(final Result res, final Result... others) {
public Collection<UpdateInfo<T>> searchUpdatesForRecord(final K res, final Collection<K> others) {
final Map<String, UpdateInfo<T>> infoMap = new HashMap<>();
for (final Result source : others) {
for (final K source : others) {
if (source != res) {
for (final UpdateInfo<T> info : findUpdates(source, res)) {
final String s = DigestUtils.md5Hex(info.getHighlightValueAsString());
if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {} else {
if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {
} else {
infoMap.put(s, info);
}
}
@ -51,11 +51,11 @@ public abstract class UpdateMatcher<T> {
}
}
protected abstract List<UpdateInfo<T>> findUpdates(Result source, Result target);
protected abstract List<UpdateInfo<T>> findUpdates(K source, K target);
protected abstract UpdateInfo<T> generateUpdateInfo(final T highlightValue,
final Result source,
final Result target);
final K source,
final K target);
protected static boolean isMissing(final List<Field<String>> list) {
return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0).getValue());

View File

@ -4,4 +4,6 @@ package eu.dnetlib.dhp.broker.oa.util;
public class BrokerConstants {
public final static String OPEN_ACCESS = "OPEN";
public final static String IS_MERGED_IN_CLASS = "isMergedIn";
}

View File

@ -0,0 +1,164 @@
package eu.dnetlib.dhp.oa.dedup;
import java.text.Normalizer;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import com.wcohen.ss.JaroWinkler;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.pace.model.Person;
import scala.Tuple2;
public class AuthorMerger {
private static final Double THRESHOLD = 0.95;
public static List<Author> merge(List<List<Author>> authors) {
authors.sort(new Comparator<List<Author>>() {
@Override
public int compare(List<Author> o1, List<Author> o2) {
return -Integer.compare(countAuthorsPids(o1), countAuthorsPids(o2));
}
});
List<Author> author = new ArrayList<>();
for (List<Author> a : authors) {
author = mergeAuthor(author, a);
}
return author;
}
public static List<Author> mergeAuthor(final List<Author> a, final List<Author> b) {
int pa = countAuthorsPids(a);
int pb = countAuthorsPids(b);
List<Author> base, enrich;
int sa = authorsSize(a);
int sb = authorsSize(b);
if (pa == pb) {
base = sa > sb ? a : b;
enrich = sa > sb ? b : a;
} else {
base = pa > pb ? a : b;
enrich = pa > pb ? b : a;
}
enrichPidFromList(base, enrich);
return base;
}
private static void enrichPidFromList(List<Author> base, List<Author> enrich) {
if (base == null || enrich == null)
return;
final Map<String, Author> basePidAuthorMap = base
.stream()
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
.flatMap(
a -> a
.getPid()
.stream()
.map(p -> new Tuple2<>(pidToComparableString(p), a)))
.collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1));
final List<Tuple2<StructuredProperty, Author>> pidToEnrich = enrich
.stream()
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
.flatMap(
a -> a
.getPid()
.stream()
.filter(p -> !basePidAuthorMap.containsKey(pidToComparableString(p)))
.map(p -> new Tuple2<>(p, a)))
.collect(Collectors.toList());
pidToEnrich
.forEach(
a -> {
Optional<Tuple2<Double, Author>> simAuthor = base
.stream()
.map(ba -> new Tuple2<>(sim(ba, a._2()), ba))
.max(Comparator.comparing(Tuple2::_1));
if (simAuthor.isPresent() && simAuthor.get()._1() > THRESHOLD) {
Author r = simAuthor.get()._2();
if (r.getPid() == null) {
r.setPid(new ArrayList<>());
}
r.getPid().add(a._1());
}
});
}
public static String pidToComparableString(StructuredProperty pid) {
return (pid.getQualifier() != null
? pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase() : ""
: "") + (pid.getValue() != null ? pid.getValue().toLowerCase() : "");
}
public static int countAuthorsPids(List<Author> authors) {
if (authors == null)
return 0;
return (int) authors.stream().filter(AuthorMerger::hasPid).count();
}
private static int authorsSize(List<Author> authors) {
if (authors == null)
return 0;
return authors.size();
}
private static Double sim(Author a, Author b) {
final Person pa = parse(a);
final Person pb = parse(b);
if (pa.isAccurate() & pb.isAccurate()) {
return new JaroWinkler()
.score(normalize(pa.getSurnameString()), normalize(pb.getSurnameString()));
} else {
return new JaroWinkler()
.score(normalize(pa.getNormalisedFullname()), normalize(pb.getNormalisedFullname()));
}
}
private static boolean hasPid(Author a) {
if (a == null || a.getPid() == null || a.getPid().size() == 0)
return false;
return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue()));
}
private static Person parse(Author author) {
if (StringUtils.isNotBlank(author.getSurname())) {
return new Person(author.getSurname() + ", " + author.getName(), false);
} else {
return new Person(author.getFullname(), false);
}
}
private static String normalize(final String s) {
return nfd(s)
.toLowerCase()
// do not compact the regexes in a single expression, would cause StackOverflowError
// in case
// of large input strings
.replaceAll("(\\W)+", " ")
.replaceAll("(\\p{InCombiningDiacriticalMarks})+", " ")
.replaceAll("(\\p{Punct})+", " ")
.replaceAll("(\\d)+", " ")
.replaceAll("(\\n)+", " ")
.trim();
}
private static String nfd(final String s) {
return Normalizer.normalize(s, Normalizer.Form.NFD);
}
}

View File

@ -1,8 +1,10 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
@ -67,16 +69,19 @@ public class DedupRecordFactory {
(MapFunction<Tuple2<String, T>, String>) entity -> entity._1(), Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, Tuple2<String, T>, T>) (key,
values) -> entityMerger(key, values, ts, dataInfo),
values) -> entityMerger(key, values, ts, dataInfo, clazz),
Encoders.bean(clazz));
}
private static <T extends OafEntity> T entityMerger(
String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo) {
public static <T extends OafEntity> T entityMerger(
String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo, Class<T> clazz)
throws IllegalAccessException, InstantiationException {
T entity = entities.next()._2();
T entity = clazz.newInstance();
final Collection<String> dates = Lists.newArrayList();
final List<List<Author>> authors = Lists.newArrayList();
entities
.forEachRemaining(
t -> {
@ -84,17 +89,17 @@ public class DedupRecordFactory {
entity.mergeFrom(duplicate);
if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result r1 = (Result) duplicate;
Result er = (Result) entity;
er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor()));
if (r1.getDateofacceptance() != null) {
if (r1.getAuthor() != null && r1.getAuthor().size() > 0)
authors.add(r1.getAuthor());
if (r1.getDateofacceptance() != null)
dates.add(r1.getDateofacceptance().getValue());
}
}
});
// set authors and date
if (ModelSupport.isSubClass(entity, Result.class)) {
((Result) entity).setDateofacceptance(DatePicker.pick(dates));
((Result) entity).setAuthor(AuthorMerger.merge(authors));
}
entity.setId(id);

View File

@ -32,7 +32,6 @@ import eu.dnetlib.pace.model.Person;
import scala.Tuple2;
public class DedupUtility {
private static final Double THRESHOLD = 0.95;
public static Map<String, LongAccumulator> constructAccumulator(
final DedupConfig dedupConf, final SparkContext context) {
@ -82,61 +81,6 @@ public class DedupUtility {
}
}
public static List<Author> mergeAuthor(final List<Author> a, final List<Author> b) {
int pa = countAuthorsPids(a);
int pb = countAuthorsPids(b);
List<Author> base, enrich;
int sa = authorsSize(a);
int sb = authorsSize(b);
if (pa == pb) {
base = sa > sb ? a : b;
enrich = sa > sb ? b : a;
} else {
base = pa > pb ? a : b;
enrich = pa > pb ? b : a;
}
enrichPidFromList(base, enrich);
return base;
}
private static void enrichPidFromList(List<Author> base, List<Author> enrich) {
if (base == null || enrich == null)
return;
final Map<String, Author> basePidAuthorMap = base
.stream()
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
.flatMap(a -> a.getPid().stream().map(p -> new Tuple2<>(p.toComparableString(), a)))
.collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1));
final List<Tuple2<StructuredProperty, Author>> pidToEnrich = enrich
.stream()
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
.flatMap(
a -> a
.getPid()
.stream()
.filter(p -> !basePidAuthorMap.containsKey(p.toComparableString()))
.map(p -> new Tuple2<>(p, a)))
.collect(Collectors.toList());
pidToEnrich
.forEach(
a -> {
Optional<Tuple2<Double, Author>> simAuhtor = base
.stream()
.map(ba -> new Tuple2<>(sim(ba, a._2()), ba))
.max(Comparator.comparing(Tuple2::_1));
if (simAuhtor.isPresent() && simAuhtor.get()._1() > THRESHOLD) {
Author r = simAuhtor.get()._2();
if (r.getPid() == null) {
r.setPid(new ArrayList<>());
}
r.getPid().add(a._1());
}
});
}
public static String createDedupRecordPath(
final String basePath, final String actionSetId, final String entityType) {
return String.format("%s/%s/%s_deduprecord", basePath, actionSetId, entityType);
@ -156,65 +100,6 @@ public class DedupUtility {
return String.format("%s/%s/%s_mergerel", basePath, actionSetId, entityType);
}
private static Double sim(Author a, Author b) {
final Person pa = parse(a);
final Person pb = parse(b);
if (pa.isAccurate() & pb.isAccurate()) {
return new JaroWinkler()
.score(normalize(pa.getSurnameString()), normalize(pb.getSurnameString()));
} else {
return new JaroWinkler()
.score(normalize(pa.getNormalisedFullname()), normalize(pb.getNormalisedFullname()));
}
}
private static String normalize(final String s) {
return nfd(s)
.toLowerCase()
// do not compact the regexes in a single expression, would cause StackOverflowError
// in case
// of large input strings
.replaceAll("(\\W)+", " ")
.replaceAll("(\\p{InCombiningDiacriticalMarks})+", " ")
.replaceAll("(\\p{Punct})+", " ")
.replaceAll("(\\d)+", " ")
.replaceAll("(\\n)+", " ")
.trim();
}
private static String nfd(final String s) {
return Normalizer.normalize(s, Normalizer.Form.NFD);
}
private static Person parse(Author author) {
if (StringUtils.isNotBlank(author.getSurname())) {
return new Person(author.getSurname() + ", " + author.getName(), false);
} else {
return new Person(author.getFullname(), false);
}
}
private static int countAuthorsPids(List<Author> authors) {
if (authors == null)
return 0;
return (int) authors.stream().filter(DedupUtility::hasPid).count();
}
private static int authorsSize(List<Author> authors) {
if (authors == null)
return 0;
return authors.size();
}
private static boolean hasPid(Author a) {
if (a == null || a.getPid() == null || a.getPid().size() == 0)
return false;
return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue()));
}
public static List<DedupConfig> getConfigurations(String isLookUpUrl, String orchestrator)
throws ISLookUpException, DocumentException {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl);

View File

@ -0,0 +1,139 @@
package eu.dnetlib.dhp.oa.dedup;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Paths;
import java.util.*;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
public class EntityMergerTest implements Serializable {
List<Tuple2<String, Publication>> publications;
String testEntityBasePath;
DataInfo dataInfo;
String dedupId = "dedup_id";
Publication pub_top;
@BeforeEach
public void setUp() throws Exception {
testEntityBasePath = Paths
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/json").toURI())
.toFile()
.getAbsolutePath();
publications = readSample(testEntityBasePath + "/publication_merge.json", Publication.class);
pub_top = getTopPub(publications);
dataInfo = setDI();
}
@Test
public void publicationMergerTest() throws InstantiationException, IllegalAccessException {
Publication pub_merged = DedupRecordFactory
.entityMerger(dedupId, publications.iterator(), 0, dataInfo, Publication.class);
assertEquals(dedupId, pub_merged.getId());
assertEquals(pub_merged.getJournal(), pub_top.getJournal());
assertEquals(pub_merged.getBestaccessright(), pub_top.getBestaccessright());
assertEquals(pub_merged.getResulttype(), pub_top.getResulttype());
assertEquals(pub_merged.getLanguage(), pub_merged.getLanguage());
assertEquals(pub_merged.getPublisher(), pub_top.getPublisher());
assertEquals(pub_merged.getEmbargoenddate(), pub_top.getEmbargoenddate());
assertEquals(pub_merged.getResourcetype().getClassid(), "0004");
assertEquals(pub_merged.getDateoftransformation(), pub_top.getDateoftransformation());
assertEquals(pub_merged.getOaiprovenance(), pub_top.getOaiprovenance());
assertEquals(pub_merged.getDateofcollection(), pub_top.getDateofcollection());
assertEquals(pub_merged.getInstance().size(), 3);
assertEquals(pub_merged.getCountry().size(), 2);
assertEquals(pub_merged.getSubject().size(), 0);
assertEquals(pub_merged.getTitle().size(), 2);
assertEquals(pub_merged.getRelevantdate().size(), 0);
assertEquals(pub_merged.getDescription().size(), 0);
assertEquals(pub_merged.getSource().size(), 0);
assertEquals(pub_merged.getFulltext().size(), 0);
assertEquals(pub_merged.getFormat().size(), 0);
assertEquals(pub_merged.getContributor().size(), 0);
assertEquals(pub_merged.getCoverage().size(), 0);
assertEquals(pub_merged.getContext().size(), 0);
assertEquals(pub_merged.getExternalReference().size(), 0);
assertEquals(pub_merged.getOriginalId().size(), 3);
assertEquals(pub_merged.getCollectedfrom().size(), 3);
assertEquals(pub_merged.getPid().size(), 1);
assertEquals(pub_merged.getExtraInfo().size(), 0);
// verify datainfo
assertEquals(pub_merged.getDataInfo(), dataInfo);
// verify datepicker
assertEquals(pub_merged.getDateofacceptance().getValue(), "2018-09-30");
// verify authors
assertEquals(pub_merged.getAuthor().size(), 9);
assertEquals(AuthorMerger.countAuthorsPids(pub_merged.getAuthor()), 4);
}
public DataInfo setDI() {
DataInfo dataInfo = new DataInfo();
dataInfo.setTrust("0.9");
dataInfo.setDeletedbyinference(false);
dataInfo.setInferenceprovenance("testing");
dataInfo.setInferred(true);
return dataInfo;
}
public Publication getTopPub(List<Tuple2<String, Publication>> publications) {
Double maxTrust = 0.0;
Publication maxPub = new Publication();
for (Tuple2<String, Publication> publication : publications) {
Double pubTrust = Double.parseDouble(publication._2().getDataInfo().getTrust());
if (pubTrust > maxTrust) {
maxTrust = pubTrust;
maxPub = publication._2();
}
}
return maxPub;
}
public <T> List<Tuple2<String, T>> readSample(String path, Class<T> clazz) {
List<Tuple2<String, T>> res = new ArrayList<>();
BufferedReader reader;
try {
reader = new BufferedReader(new FileReader(path));
String line = reader.readLine();
while (line != null) {
res
.add(
new Tuple2<>(
MapDocumentUtil.getJPathString("$.id", line),
new ObjectMapper().readValue(line, clazz)));
// read next line
line = reader.readLine();
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
return res;
}
}

View File

@ -1,54 +0,0 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.jupiter.api.BeforeEach;
import eu.dnetlib.dhp.schema.oaf.Publication;
public class MergeAuthorTest {
private List<Publication> publicationsToMerge;
private final ObjectMapper mapper = new ObjectMapper();
@BeforeEach
public void setUp() throws Exception {
final String json = IOUtils
.toString(
this.getClass().getResourceAsStream("/eu/dnetlib/dhp/dedup/json/authors_merge.json"));
publicationsToMerge = Arrays
.asList(json.split("\n"))
.stream()
.map(
s -> {
try {
return mapper.readValue(s, Publication.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
}
// FIX ME Michele DB this tests doesn't work
// @Test
public void test() throws Exception {
Publication dedup = new Publication();
publicationsToMerge
.forEach(
p -> {
dedup.mergeFrom(p);
dedup.setAuthor(DedupUtility.mergeAuthor(dedup.getAuthor(), p.getAuthor()));
});
System.out.println(mapper.writeValueAsString(dedup));
}
}

File diff suppressed because one or more lines are too long

View File

@ -10,7 +10,16 @@ import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.oaiIProvenance;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PID_TYPES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PRODUCED_BY;
import static eu.dnetlib.dhp.schema.common.ModelConstants.ORP_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.OUTCOME;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PRODUCES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PUBLICATION_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.REPOSITORY_PROVENANCE_ACTIONS;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE;
import java.util.ArrayList;
import java.util.Arrays;
@ -50,6 +59,10 @@ public abstract class AbstractMdRecordToOafMapper {
protected static final String DATACITE_SCHEMA_KERNEL_4 = "http://datacite.org/schema/kernel-4";
protected static final String DATACITE_SCHEMA_KERNEL_3 = "http://datacite.org/schema/kernel-3";
protected static final Qualifier ORCID_PID_TYPE = qualifier(
"ORCID", "Open Researcher and Contributor ID", DNET_PID_TYPES, DNET_PID_TYPES);
protected static final Qualifier MAG_PID_TYPE = qualifier(
"MAGIdentifier", "Microsoft Academic Graph Identifier", DNET_PID_TYPES, DNET_PID_TYPES);
protected static final Map<String, String> nsContext = new HashMap<>();
@ -75,8 +88,7 @@ public abstract class AbstractMdRecordToOafMapper {
DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext);
final Document doc = DocumentHelper
.parseText(
xml.replaceAll(DATACITE_SCHEMA_KERNEL_4, DATACITE_SCHEMA_KERNEL_3));
.parseText(xml.replaceAll(DATACITE_SCHEMA_KERNEL_4, DATACITE_SCHEMA_KERNEL_3));
final String type = doc.valueOf("//dr:CobjCategory/@type");
final KeyValue collectedFrom = getProvenanceDatasource(
@ -103,7 +115,7 @@ public abstract class AbstractMdRecordToOafMapper {
}
}
private KeyValue getProvenanceDatasource(Document doc, String xpathId, String xpathName) {
private KeyValue getProvenanceDatasource(final Document doc, final String xpathId, final String xpathName) {
final String dsId = doc.valueOf(xpathId);
final String dsName = doc.valueOf(xpathName);
@ -111,9 +123,7 @@ public abstract class AbstractMdRecordToOafMapper {
return null;
}
return keyValue(
createOpenaireId(10, dsId, true),
dsName);
return keyValue(createOpenaireId(10, dsId, true), dsName);
}
protected List<Oaf> createOafs(
@ -211,8 +221,14 @@ public abstract class AbstractMdRecordToOafMapper {
return res;
}
protected Relation getRelation(String source, String target, String relType, String subRelType, String relClass,
KeyValue collectedFrom, DataInfo info, long lastUpdateTimestamp) {
protected Relation getRelation(final String source,
final String target,
final String relType,
final String subRelType,
final String relClass,
final KeyValue collectedFrom,
final DataInfo info,
final long lastUpdateTimestamp) {
final Relation rel = new Relation();
rel.setRelType(relType);
rel.setSubRelType(subRelType);
@ -289,7 +305,10 @@ public abstract class AbstractMdRecordToOafMapper {
protected abstract Qualifier prepareResourceType(Document doc, DataInfo info);
protected abstract List<Instance> prepareInstances(
Document doc, DataInfo info, KeyValue collectedfrom, KeyValue hostedby);
Document doc,
DataInfo info,
KeyValue collectedfrom,
KeyValue hostedby);
protected abstract List<Field<String>> prepareSources(Document doc, DataInfo info);
@ -314,13 +333,16 @@ public abstract class AbstractMdRecordToOafMapper {
protected abstract List<Author> prepareAuthors(Document doc, DataInfo info);
protected abstract List<Field<String>> prepareOtherResearchProductTools(
Document doc, DataInfo info);
Document doc,
DataInfo info);
protected abstract List<Field<String>> prepareOtherResearchProductContactGroups(
Document doc, DataInfo info);
Document doc,
DataInfo info);
protected abstract List<Field<String>> prepareOtherResearchProductContactPersons(
Document doc, DataInfo info);
Document doc,
DataInfo info);
protected abstract Qualifier prepareSoftwareProgrammingLanguage(Document doc, DataInfo info);
@ -329,7 +351,8 @@ public abstract class AbstractMdRecordToOafMapper {
protected abstract List<StructuredProperty> prepareSoftwareLicenses(Document doc, DataInfo info);
protected abstract List<Field<String>> prepareSoftwareDocumentationUrls(
Document doc, DataInfo info);
Document doc,
DataInfo info);
protected abstract List<GeoLocation> prepareDatasetGeoLocations(Document doc, DataInfo info);
@ -358,26 +381,17 @@ public abstract class AbstractMdRecordToOafMapper {
final String vol = n.valueOf("@vol");
final String edition = n.valueOf("@edition");
if (StringUtils.isNotBlank(name)) {
return journal(
name,
issnPrinted,
issnOnline,
issnLinking,
ep,
iss,
sp,
vol,
edition,
null,
null,
info);
return journal(name, issnPrinted, issnOnline, issnLinking, ep, iss, sp, vol, edition, null, null, info);
}
}
return null;
}
protected Qualifier prepareQualifier(
final Node node, final String xpath, final String schemeId, final String schemeName) {
final Node node,
final String xpath,
final String schemeId,
final String schemeName) {
final String classId = node.valueOf(xpath);
final String className = code2name.get(classId);
return qualifier(classId, className, schemeId, schemeName);
@ -401,7 +415,10 @@ public abstract class AbstractMdRecordToOafMapper {
}
protected List<StructuredProperty> prepareListStructProps(
final Node node, final String xpath, final Qualifier qualifier, final DataInfo info) {
final Node node,
final String xpath,
final Qualifier qualifier,
final DataInfo info) {
final List<StructuredProperty> res = new ArrayList<>();
for (final Object o : node.selectNodes(xpath)) {
final Node n = (Node) o;
@ -411,19 +428,17 @@ public abstract class AbstractMdRecordToOafMapper {
}
protected List<StructuredProperty> prepareListStructProps(
final Node node, final String xpath, final DataInfo info) {
final Node node,
final String xpath,
final DataInfo info) {
final List<StructuredProperty> res = new ArrayList<>();
for (final Object o : node.selectNodes(xpath)) {
final Node n = (Node) o;
res
.add(
structuredProperty(
n.getText(),
n.valueOf("@classid"),
n.valueOf("@classname"),
n.valueOf("@schemeid"),
n.valueOf("@schemename"),
info));
n.getText(), n.valueOf("@classid"), n.valueOf("@classname"), n.valueOf("@schemeid"),
n.valueOf("@schemename"), info));
}
return res;
}
@ -449,8 +464,7 @@ public abstract class AbstractMdRecordToOafMapper {
final Node n = doc.selectSingleNode("//oaf:datainfo");
if (n == null) {
return dataInfo(
false, null, false, false, REPOSITORY_PROVENANCE_ACTIONS, "0.9");
return dataInfo(false, null, false, false, REPOSITORY_PROVENANCE_ACTIONS, "0.9");
}
final String paClassId = n.valueOf("./oaf:provenanceaction/@classid");
@ -464,12 +478,8 @@ public abstract class AbstractMdRecordToOafMapper {
final String trust = n.valueOf("./oaf:trust");
return dataInfo(
deletedbyinference,
inferenceprovenance,
inferred,
false,
qualifier(paClassId, paClassName, paSchemeId, paSchemeName),
trust);
deletedbyinference, inferenceprovenance, inferred, false,
qualifier(paClassId, paClassName, paSchemeId, paSchemeName), trust);
}
protected Field<String> prepareField(final Node node, final String xpath, final DataInfo info) {
@ -477,7 +487,9 @@ public abstract class AbstractMdRecordToOafMapper {
}
protected List<Field<String>> prepareListFields(
final Node node, final String xpath, final DataInfo info) {
final Node node,
final String xpath,
final DataInfo info) {
return listFields(info, prepareListString(node, xpath));
}

View File

@ -1,10 +1,19 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_ACCESS_MODES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_LANGUAGES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PUBLICATION_RESOURCE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_RELATED_TO;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PUBLICATION_DATASET;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_RESULT;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
@ -15,8 +24,15 @@ import org.dom4j.Node;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.oa.graph.raw.common.PacePerson;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.GeoLocation;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class OafToOafMapper extends AbstractMdRecordToOafMapper {
@ -39,14 +55,25 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
author.setSurname(p.getNormalisedSurname());
}
final String pid = e.attributeValue("nameIdentifier");
final String pidType = e.attributeValue("nameIdentifierScheme");
final String pid = e.valueOf("./@nameIdentifier");
final String type = e
.valueOf("./@nameIdentifierScheme")
.trim()
.toUpperCase()
.replaceAll(" ", "")
.replaceAll("_", "");
author.setPid(new ArrayList<>());
if (StringUtils.isNotBlank(pid) && StringUtils.isNotBlank(pidType)) {
author
.getPid()
.add(structuredProperty(pid, qualifier(pidType, pidType, DNET_PID_TYPES, DNET_PID_TYPES), info));
if (StringUtils.isNotBlank(pid)) {
if (type.startsWith("ORCID")) {
final String cleanedId = pid
.replaceAll("http://orcid.org/", "")
.replaceAll("https://orcid.org/", "");
author.getPid().add(structuredProperty(cleanedId, ORCID_PID_TYPE, info));
} else if (type.startsWith("MAGID")) {
author.getPid().add(structuredProperty(pid, MAG_PID_TYPE, info));
}
}
res.add(author);
@ -104,28 +131,21 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
final Instance instance = new Instance();
instance
.setInstancetype(
prepareQualifier(
doc,
"//dr:CobjCategory",
DNET_PUBLICATION_RESOURCE,
DNET_PUBLICATION_RESOURCE));
prepareQualifier(doc, "//dr:CobjCategory", DNET_PUBLICATION_RESOURCE, DNET_PUBLICATION_RESOURCE));
instance.setCollectedfrom(collectedfrom);
instance.setHostedby(hostedby);
instance.setDateofacceptance(field(doc.valueOf("//oaf:dateAccepted"), info));
instance.setDistributionlocation(doc.valueOf("//oaf:distributionlocation"));
instance
.setAccessright(
prepareQualifier(doc, "//oaf:accessrights", DNET_ACCESS_MODES, DNET_ACCESS_MODES));
.setAccessright(prepareQualifier(doc, "//oaf:accessrights", DNET_ACCESS_MODES, DNET_ACCESS_MODES));
instance.setLicense(field(doc.valueOf("//oaf:license"), info));
instance.setRefereed(field(doc.valueOf("//oaf:refereed"), info));
instance
.setProcessingchargeamount(
field(doc.valueOf("//oaf:processingchargeamount"), info));
.setProcessingchargeamount(field(doc.valueOf("//oaf:processingchargeamount"), info));
instance
.setProcessingchargecurrency(
field(doc.valueOf("//oaf:processingchargeamount/@currency"), info));
.setProcessingchargecurrency(field(doc.valueOf("//oaf:processingchargeamount/@currency"), info));
List<Node> nodes = Lists.newArrayList(doc.selectNodes("//dc:identifier"));
final List<Node> nodes = Lists.newArrayList(doc.selectNodes("//dc:identifier"));
instance
.setUrl(
nodes
@ -158,19 +178,22 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
@Override
protected Field<String> prepareSoftwareCodeRepositoryUrl(
final Document doc, final DataInfo info) {
final Document doc,
final DataInfo info) {
return null; // NOT PRESENT IN OAF
}
@Override
protected List<StructuredProperty> prepareSoftwareLicenses(
final Document doc, final DataInfo info) {
final Document doc,
final DataInfo info) {
return new ArrayList<>(); // NOT PRESENT IN OAF
}
@Override
protected List<Field<String>> prepareSoftwareDocumentationUrls(
final Document doc, final DataInfo info) {
final Document doc,
final DataInfo info) {
return new ArrayList<>(); // NOT PRESENT IN OAF
}
@ -182,13 +205,15 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
@Override
protected Field<String> prepareDatasetMetadataVersionNumber(
final Document doc, final DataInfo info) {
final Document doc,
final DataInfo info) {
return null; // NOT PRESENT IN OAF
}
@Override
protected Field<String> prepareDatasetLastMetadataUpdate(
final Document doc, final DataInfo info) {
final Document doc,
final DataInfo info) {
return null; // NOT PRESENT IN OAF
}
@ -216,19 +241,22 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
@Override
protected List<Field<String>> prepareOtherResearchProductTools(
final Document doc, final DataInfo info) {
final Document doc,
final DataInfo info) {
return new ArrayList<>(); // NOT PRESENT IN OAF
}
@Override
protected List<Field<String>> prepareOtherResearchProductContactGroups(
final Document doc, final DataInfo info) {
final Document doc,
final DataInfo info) {
return new ArrayList<>(); // NOT PRESENT IN OAF
}
@Override
protected List<Field<String>> prepareOtherResearchProductContactPersons(
final Document doc, final DataInfo info) {
final Document doc,
final DataInfo info) {
return new ArrayList<>(); // NOT PRESENT IN OAF
}

View File

@ -4,16 +4,31 @@ package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_ACCESS_MODES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_DATA_CITE_DATE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_DATA_CITE_RESOURCE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_LANGUAGES;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PUBLICATION_RESOURCE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.HAS_PARTS;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PART_OF;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_SUPPLEMENTED_BY;
import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_SUPPLEMENT_TO;
import static eu.dnetlib.dhp.schema.common.ModelConstants.PART;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_RESULT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.SUPPLEMENT;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document;
import org.dom4j.Node;
import eu.dnetlib.dhp.oa.graph.raw.common.PacePerson;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
@ -22,7 +37,6 @@ import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
@ -48,7 +62,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
final String fullname = n.valueOf("./datacite:creatorName");
author.setFullname(fullname);
PacePerson pp = new PacePerson(fullname, false);
final PacePerson pp = new PacePerson(fullname, false);
final String name = n.valueOf("./datacite:givenName");
if (StringUtils.isBlank(name) & pp.isAccurate()) {
author.setName(pp.getNormalisedFirstName());
@ -63,6 +77,10 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
author.setSurname(surname);
}
if (StringUtils.isBlank(author.getFullname())) {
author.setFullname(String.format("%s, %s", author.getSurname(), author.getName()));
}
author.setAffiliation(prepareListFields(n, "./datacite:affiliation", info));
author.setPid(preparePids(n, info));
author.setRank(pos++);
@ -74,13 +92,21 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
private List<StructuredProperty> preparePids(final Node n, final DataInfo info) {
final List<StructuredProperty> res = new ArrayList<>();
for (final Object o : n.selectNodes("./datacite:nameIdentifier")) {
res
.add(
structuredProperty(
((Node) o).getText(),
prepareQualifier(
(Node) o, "./@nameIdentifierScheme", DNET_PID_TYPES, DNET_PID_TYPES),
info));
final String id = ((Node) o).getText();
final String type = ((Node) o)
.valueOf("./@nameIdentifierScheme")
.trim()
.toUpperCase()
.replaceAll(" ", "")
.replaceAll("_", "");
if (type.startsWith("ORCID")) {
final String cleanedId = id.replaceAll("http://orcid.org/", "").replaceAll("https://orcid.org/", "");
res.add(structuredProperty(cleanedId, ORCID_PID_TYPE, info));
} else if (type.startsWith("MAGID")) {
res.add(structuredProperty(id, MAG_PID_TYPE, info));
}
}
return res;
}
@ -95,21 +121,18 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
final Instance instance = new Instance();
instance
.setInstancetype(
prepareQualifier(
doc, "//dr:CobjCategory", DNET_PUBLICATION_RESOURCE, DNET_PUBLICATION_RESOURCE));
prepareQualifier(doc, "//dr:CobjCategory", DNET_PUBLICATION_RESOURCE, DNET_PUBLICATION_RESOURCE));
instance.setCollectedfrom(collectedfrom);
instance.setHostedby(hostedby);
instance.setDateofacceptance(field(doc.valueOf("//oaf:dateAccepted"), info));
instance.setDistributionlocation(doc.valueOf("//oaf:distributionlocation"));
instance
.setAccessright(
prepareQualifier(doc, "//oaf:accessrights", DNET_ACCESS_MODES, DNET_ACCESS_MODES));
.setAccessright(prepareQualifier(doc, "//oaf:accessrights", DNET_ACCESS_MODES, DNET_ACCESS_MODES));
instance.setLicense(field(doc.valueOf("//oaf:license"), info));
instance.setRefereed(field(doc.valueOf("//oaf:refereed"), info));
instance.setProcessingchargeamount(field(doc.valueOf("//oaf:processingchargeamount"), info));
instance
.setProcessingchargecurrency(
field(doc.valueOf("//oaf:processingchargeamount/@currency"), info));
.setProcessingchargecurrency(field(doc.valueOf("//oaf:processingchargeamount/@currency"), info));
final Set<String> url = new HashSet<>();
for (final Object o : doc.selectNodes("//datacite:alternateIdentifier[@alternateIdentifierType='URL']")) {
@ -149,11 +172,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
res
.add(
structuredProperty(
((Node) o).getText(),
"UNKNOWN",
"UNKNOWN",
DNET_DATA_CITE_DATE,
DNET_DATA_CITE_DATE,
((Node) o).getText(), "UNKNOWN", "UNKNOWN", DNET_DATA_CITE_DATE, DNET_DATA_CITE_DATE,
info));
}
}
@ -197,53 +216,52 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
@Override
protected List<Field<String>> prepareOtherResearchProductTools(
final Document doc, final DataInfo info) {
final Document doc,
final DataInfo info) {
return new ArrayList<>(); // Not present in ODF ???
}
@Override
protected List<Field<String>> prepareOtherResearchProductContactGroups(
final Document doc, final DataInfo info) {
final Document doc,
final DataInfo info) {
return prepareListFields(
doc,
"//datacite:contributor[@contributorType='ContactGroup']/datacite:contributorName",
info);
doc, "//datacite:contributor[@contributorType='ContactGroup']/datacite:contributorName", info);
}
@Override
protected List<Field<String>> prepareOtherResearchProductContactPersons(
final Document doc, final DataInfo info) {
final Document doc,
final DataInfo info) {
return prepareListFields(
doc,
"//datacite:contributor[@contributorType='ContactPerson']/datacite:contributorName",
info);
doc, "//datacite:contributor[@contributorType='ContactPerson']/datacite:contributorName", info);
}
@Override
protected Qualifier prepareSoftwareProgrammingLanguage(final Document doc, final DataInfo info) {
return prepareQualifier(
doc, "//datacite:format", "dnet:programming_languages", "dnet:programming_languages");
return prepareQualifier(doc, "//datacite:format", "dnet:programming_languages", "dnet:programming_languages");
}
@Override
protected Field<String> prepareSoftwareCodeRepositoryUrl(
final Document doc, final DataInfo info) {
final Document doc,
final DataInfo info) {
return null; // Not present in ODF ???
}
@Override
protected List<StructuredProperty> prepareSoftwareLicenses(
final Document doc, final DataInfo info) {
final Document doc,
final DataInfo info) {
return new ArrayList<>(); // Not present in ODF ???
}
@Override
protected List<Field<String>> prepareSoftwareDocumentationUrls(
final Document doc, final DataInfo info) {
final Document doc,
final DataInfo info) {
return prepareListFields(
doc,
"//datacite:relatedIdentifier[@relatedIdentifierType='URL' and @relationType='IsDocumentedBy']",
info);
doc, "//datacite:relatedIdentifier[@relatedIdentifierType='URL' and @relationType='IsDocumentedBy']", info);
}
// DATASETS
@ -264,13 +282,15 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
@Override
protected Field<String> prepareDatasetMetadataVersionNumber(
final Document doc, final DataInfo info) {
final Document doc,
final DataInfo info) {
return null; // Not present in ODF ???
}
@Override
protected Field<String> prepareDatasetLastMetadataUpdate(
final Document doc, final DataInfo info) {
final Document doc,
final DataInfo info) {
return prepareField(doc, "//datacite:date[@dateType='Updated']", info);
}
@ -346,9 +366,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
@Override
protected Qualifier prepareResourceType(final Document doc, final DataInfo info) {
return prepareQualifier(
doc,
"//*[local-name() = 'resource']//*[local-name() = 'resourceType']",
DNET_DATA_CITE_RESOURCE,
doc, "//*[local-name() = 'resource']//*[local-name() = 'resourceType']", DNET_DATA_CITE_RESOURCE,
DNET_DATA_CITE_RESOURCE);
}
}

View File

@ -22,12 +22,13 @@ SELECT
'' AS inferenceprovenance,
d.id AS collectedfromid,
d.officialname AS collectedfromname,
o.country || '@@@' || o.country || '@@@dnet:countries@@@dnet:countries' AS country,
o.country || '@@@' || COALESCE(cntr.name,o.country) || '@@@dnet:countries@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' AS provenanceaction,
ARRAY[]::text[] AS pid
FROM dsm_organizations o
LEFT OUTER JOIN dsm_datasources d ON (d.id = o.collectedfrom)
LEFT OUTER JOIN class cntr ON (cntr.code = o.country)

View File

@ -21,7 +21,14 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
@ExtendWith(MockitoExtension.class)
public class MappersTest {
@ -54,13 +61,13 @@ public class MappersTest {
assertTrue(StringUtils.isNotBlank(p.getTitle().get(0).getValue()));
assertTrue(p.getAuthor().size() > 0);
Optional<Author> author = p
final Optional<Author> author = p
.getAuthor()
.stream()
.filter(a -> a.getPid() != null && !a.getPid().isEmpty())
.findFirst();
assertTrue(author.isPresent());
StructuredProperty pid = author
final StructuredProperty pid = author
.get()
.getPid()
.stream()
@ -68,7 +75,7 @@ public class MappersTest {
.get();
assertEquals("0000-0001-6651-1178", pid.getValue());
assertEquals("ORCID", pid.getQualifier().getClassid());
assertEquals("ORCID", pid.getQualifier().getClassname());
assertEquals("Open Researcher and Contributor ID", pid.getQualifier().getClassname());
assertEquals(ModelConstants.DNET_PID_TYPES, pid.getQualifier().getSchemeid());
assertEquals(ModelConstants.DNET_PID_TYPES, pid.getQualifier().getSchemename());
assertEquals("Votsi,Nefta", author.get().getFullname());
@ -121,13 +128,13 @@ public class MappersTest {
assertTrue(StringUtils.isNotBlank(d.getTitle().get(0).getValue()));
assertTrue(d.getAuthor().size() > 0);
Optional<Author> author = d
final Optional<Author> author = d
.getAuthor()
.stream()
.filter(a -> a.getPid() != null && !a.getPid().isEmpty())
.findFirst();
assertTrue(author.isPresent());
StructuredProperty pid = author
final StructuredProperty pid = author
.get()
.getPid()
.stream()
@ -135,7 +142,7 @@ public class MappersTest {
.get();
assertEquals("0000-0001-9074-1619", pid.getValue());
assertEquals("ORCID", pid.getQualifier().getClassid());
assertEquals("ORCID", pid.getQualifier().getClassname());
assertEquals("Open Researcher and Contributor ID", pid.getQualifier().getClassname());
assertEquals(ModelConstants.DNET_PID_TYPES, pid.getQualifier().getSchemeid());
assertEquals(ModelConstants.DNET_PID_TYPES, pid.getQualifier().getSchemename());
assertEquals("Baracchini, Theo", author.get().getFullname());
@ -143,13 +150,13 @@ public class MappersTest {
assertEquals("Theo", author.get().getName());
assertEquals(1, author.get().getAffiliation().size());
Optional<Field<String>> opAff = author
final Optional<Field<String>> opAff = author
.get()
.getAffiliation()
.stream()
.findFirst();
assertTrue(opAff.isPresent());
Field<String> affiliation = opAff.get();
final Field<String> affiliation = opAff.get();
assertEquals("ISTI-CNR", affiliation.getValue());
assertTrue(d.getSubject().size() > 0);

View File

@ -3,7 +3,9 @@ package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
@ -19,8 +21,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
@ -58,6 +62,8 @@ public class PrepareRelationsJob {
public static final int MAX_RELS = 100;
public static final int DEFAULT_NUM_PARTITIONS = 3000;
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
@ -79,6 +85,24 @@ public class PrepareRelationsJob {
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
int relPartitions = Optional
.ofNullable(parser.get("relPartitions"))
.map(Integer::valueOf)
.orElse(DEFAULT_NUM_PARTITIONS);
log.info("relPartitions: {}", relPartitions);
Set<String> relationFilter = Optional
.ofNullable(parser.get("relationFilter"))
.map(s -> Sets.newHashSet(Splitter.on(",").split(s)))
.orElse(new HashSet<>());
log.info("relationFilter: {}", relationFilter);
int maxRelations = Optional
.ofNullable(parser.get("maxRelations"))
.map(Integer::valueOf)
.orElse(MAX_RELS);
log.info("maxRelations: {}", maxRelations);
SparkConf conf = new SparkConf();
runWithSparkSession(
@ -86,25 +110,74 @@ public class PrepareRelationsJob {
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
prepareRelationsFromPaths(spark, inputRelationsPath, outputPath);
prepareRelationsRDD(
spark, inputRelationsPath, outputPath, relationFilter, relPartitions, maxRelations);
});
}
private static void prepareRelationsFromPaths(
SparkSession spark, String inputRelationsPath, String outputPath) {
/**
* Dataset based implementation that prepares the graph relations by limiting the number of outgoing links and
* filtering the relation types according to the given criteria.
*
* @param spark the spark session
* @param inputRelationsPath source path for the graph relations
* @param outputPath output path for the processed relations
* @param relationFilter set of relation filters applied to the `relClass` field
* @param maxRelations maximum number of allowed outgoing edges
*/
private static void prepareRelations(
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter,
int maxRelations) {
readPathRelation(spark, inputRelationsPath)
.filter("dataInfo.deletedbyinference == false")
.filter((FilterFunction<SortableRelation>) rel -> !relationFilter.contains(rel.getRelClass()))
.groupByKey(
(MapFunction<SortableRelation, String>) value -> value.getSource(), Encoders.STRING())
.flatMapGroups(
(FlatMapGroupsFunction<String, SortableRelation, SortableRelation>) (key, values) -> Iterators
.limit(values, MAX_RELS),
.limit(values, maxRelations),
Encoders.bean(SortableRelation.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
/**
* RDD based implementation that prepares the graph relations by limiting the number of outgoing links and filtering
* the relation types according to the given criteria. Moreover, outgoing links kept within the given limit are
* prioritized according to the weights indicated in eu.dnetlib.dhp.oa.provision.model.SortableRelation.
*
* @param spark the spark session
* @param inputRelationsPath source path for the graph relations
* @param outputPath output path for the processed relations
* @param relationFilter set of relation filters applied to the `relClass` field
* @param maxRelations maximum number of allowed outgoing edges
*/
// TODO work in progress
private static void prepareRelationsRDD(
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int relPartitions,
int maxRelations) {
JavaRDD<SortableRelation> rels = readPathRelationRDD(spark, inputRelationsPath).repartition(relPartitions);
RelationPartitioner partitioner = new RelationPartitioner(rels.getNumPartitions());
// only consider those that are not virtually deleted
RDD<SortableRelation> d = rels
.filter(rel -> !rel.getDataInfo().getDeletedbyinference())
.filter(rel -> !relationFilter.contains(rel.getRelClass()))
.mapToPair(
(PairFunction<SortableRelation, SortableRelation, SortableRelation>) rel -> new Tuple2<>(rel, rel))
.groupByKey(partitioner)
.map(group -> Iterables.limit(group._2(), maxRelations))
.flatMap(group -> group.iterator())
.rdd();
spark
.createDataset(d, Encoders.bean(SortableRelation.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
/**
* Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text
* file,
@ -123,31 +196,6 @@ public class PrepareRelationsJob {
Encoders.bean(SortableRelation.class));
}
// TODO work in progress
private static void prepareRelationsRDDFromPaths(
SparkSession spark, String inputRelationsPath, String outputPath, int numPartitions) {
JavaRDD<SortableRelation> rels = readPathRelationRDD(spark, inputRelationsPath).repartition(numPartitions);
RDD<SortableRelation> d = rels
.filter(rel -> !rel.getDataInfo().getDeletedbyinference()) // only
// consider
// those
// that are not virtually
// deleted
.mapToPair(
(PairFunction<SortableRelation, SortableRelation, SortableRelation>) rel -> new Tuple2<>(rel, rel))
.groupByKey(new RelationPartitioner(rels.getNumPartitions()))
.map(p -> Iterables.limit(p._2(), MAX_RELS))
.flatMap(p -> p.iterator())
.rdd();
spark
.createDataset(d, Encoders.bean(SortableRelation.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
private static JavaRDD<SortableRelation> readPathRelationRDD(
SparkSession spark, final String inputPath) {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());

View File

@ -16,10 +16,10 @@ public class SortableRelation extends Relation implements Comparable<Relation>,
static {
weights.put("outcome", 0);
weights.put("supplement", 1);
weights.put("publicationDataset", 2);
weights.put("affiliation", 2);
weights.put("relationship", 3);
weights.put("similarity", 4);
weights.put("affiliation", 5);
weights.put("publicationDataset", 4);
weights.put("similarity", 5);
weights.put("provision", 6);
weights.put("participation", 7);

View File

@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.removePrefix;
import static eu.dnetlib.dhp.oa.provision.utils.XmlSerializationUtils.escapeXml;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
@ -95,7 +96,7 @@ public class TemplateFactory {
.add("metadata", instancemetadata)
.add(
"webresources",
webresources
(webresources != null ? webresources : new ArrayList<String>())
.stream()
.filter(StringUtils::isNotBlank)
.map(w -> getWebResource(w))

View File

@ -174,6 +174,7 @@ public class XmlRecordFactory implements Serializable {
entity
.getCollectedfrom()
.stream()
.filter(XmlRecordFactory::kvNotBlank)
.map(kv -> XmlSerializationUtils.mapKeyValue("collectedfrom", kv))
.collect(Collectors.toList()));
}
@ -183,6 +184,7 @@ public class XmlRecordFactory implements Serializable {
entity
.getOriginalId()
.stream()
.filter(Objects::nonNull)
.map(s -> XmlSerializationUtils.asXmlElement("originalId", s))
.collect(Collectors.toList()));
}
@ -192,6 +194,7 @@ public class XmlRecordFactory implements Serializable {
entity
.getPid()
.stream()
.filter(Objects::nonNull)
.map(p -> XmlSerializationUtils.mapStructuredProperty("pid", p))
.collect(Collectors.toList()));
}
@ -213,6 +216,7 @@ public class XmlRecordFactory implements Serializable {
r
.getTitle()
.stream()
.filter(Objects::nonNull)
.map(t -> XmlSerializationUtils.mapStructuredProperty("title", t))
.collect(Collectors.toList()));
}
@ -225,6 +229,7 @@ public class XmlRecordFactory implements Serializable {
r
.getAuthor()
.stream()
.filter(Objects::nonNull)
.map(
a -> {
final StringBuilder sb = new StringBuilder("<creator rank=\"" + a.getRank() + "\"");
@ -240,24 +245,26 @@ public class XmlRecordFactory implements Serializable {
a
.getPid()
.stream()
.filter(Objects::nonNull)
.filter(
sp -> isNotBlank(sp.getQualifier().getClassid())
&& isNotBlank(sp.getValue()))
.collect(
Collectors
.toMap(
p -> getAuthorPidType(p.getQualifier().getClassid()),
p -> p,
(p1, p2) -> p1))
.values()
.forEach(
sp -> {
String pidType = XmlSerializationUtils
.escapeXml(
sp.getQualifier().getClassid())
.replaceAll("\\W", "");
String pidType = getAuthorPidType(sp.getQualifier().getClassid());
String pidValue = XmlSerializationUtils.escapeXml(sp.getValue());
// ugly hack: some records
// provide swapped pidtype and
// pidvalue
// ugly hack: some records provide swapped pidtype and pidvalue
if (authorPidTypes.contains(pidValue.toLowerCase().trim())) {
sb.append(String.format(" %s=\"%s\"", pidValue, pidType));
} else {
pidType = pidType.replaceAll("\\W", "").replaceAll("\\d", "");
if (isNotBlank(pidType)) {
sb
.append(
@ -285,6 +292,7 @@ public class XmlRecordFactory implements Serializable {
r
.getContributor()
.stream()
.filter(Objects::nonNull)
.map(c -> XmlSerializationUtils.asXmlElement("contributor", c.getValue()))
.collect(Collectors.toList()));
}
@ -294,6 +302,7 @@ public class XmlRecordFactory implements Serializable {
r
.getCountry()
.stream()
.filter(Objects::nonNull)
.map(c -> XmlSerializationUtils.mapQualifier("country", c))
.collect(Collectors.toList()));
}
@ -303,6 +312,7 @@ public class XmlRecordFactory implements Serializable {
r
.getCoverage()
.stream()
.filter(Objects::nonNull)
.map(c -> XmlSerializationUtils.asXmlElement("coverage", c.getValue()))
.collect(Collectors.toList()));
}
@ -319,6 +329,7 @@ public class XmlRecordFactory implements Serializable {
r
.getDescription()
.stream()
.filter(Objects::nonNull)
.map(c -> XmlSerializationUtils.asXmlElement("description", c.getValue()))
.collect(Collectors.toList()));
}
@ -333,6 +344,7 @@ public class XmlRecordFactory implements Serializable {
r
.getSubject()
.stream()
.filter(Objects::nonNull)
.map(s -> XmlSerializationUtils.mapStructuredProperty("subject", s))
.collect(Collectors.toList()));
}
@ -345,6 +357,7 @@ public class XmlRecordFactory implements Serializable {
r
.getRelevantdate()
.stream()
.filter(Objects::nonNull)
.map(s -> XmlSerializationUtils.mapStructuredProperty("relevantdate", s))
.collect(Collectors.toList()));
}
@ -357,6 +370,7 @@ public class XmlRecordFactory implements Serializable {
r
.getSource()
.stream()
.filter(Objects::nonNull)
.map(c -> XmlSerializationUtils.asXmlElement("source", c.getValue()))
.collect(Collectors.toList()));
}
@ -366,6 +380,7 @@ public class XmlRecordFactory implements Serializable {
r
.getFormat()
.stream()
.filter(Objects::nonNull)
.map(c -> XmlSerializationUtils.asXmlElement("format", c.getValue()))
.collect(Collectors.toList()));
}
@ -429,6 +444,7 @@ public class XmlRecordFactory implements Serializable {
orp
.getContactperson()
.stream()
.filter(Objects::nonNull)
.map(c -> XmlSerializationUtils.asXmlElement("contactperson", c.getValue()))
.collect(Collectors.toList()));
}
@ -439,6 +455,7 @@ public class XmlRecordFactory implements Serializable {
orp
.getContactgroup()
.stream()
.filter(Objects::nonNull)
.map(c -> XmlSerializationUtils.asXmlElement("contactgroup", c.getValue()))
.collect(Collectors.toList()));
}
@ -448,6 +465,7 @@ public class XmlRecordFactory implements Serializable {
orp
.getTool()
.stream()
.filter(Objects::nonNull)
.map(c -> XmlSerializationUtils.asXmlElement("tool", c.getValue()))
.collect(Collectors.toList()));
}
@ -461,6 +479,7 @@ public class XmlRecordFactory implements Serializable {
s
.getDocumentationUrl()
.stream()
.filter(Objects::nonNull)
.map(c -> XmlSerializationUtils.asXmlElement("documentationUrl", c.getValue()))
.collect(Collectors.toList()));
}
@ -470,6 +489,7 @@ public class XmlRecordFactory implements Serializable {
s
.getLicense()
.stream()
.filter(Objects::nonNull)
.map(l -> XmlSerializationUtils.mapStructuredProperty("license", l))
.collect(Collectors.toList()));
}
@ -576,6 +596,7 @@ public class XmlRecordFactory implements Serializable {
ds
.getOdlanguages()
.stream()
.filter(Objects::nonNull)
.map(c -> XmlSerializationUtils.asXmlElement("odlanguages", c.getValue()))
.collect(Collectors.toList()));
}
@ -585,6 +606,7 @@ public class XmlRecordFactory implements Serializable {
ds
.getOdcontenttypes()
.stream()
.filter(Objects::nonNull)
.map(c -> XmlSerializationUtils.asXmlElement("odcontenttypes", c.getValue()))
.collect(Collectors.toList()));
}
@ -697,6 +719,7 @@ public class XmlRecordFactory implements Serializable {
ds
.getPolicies()
.stream()
.filter(XmlRecordFactory::kvNotBlank)
.map(kv -> XmlSerializationUtils.mapKeyValue("policies", kv))
.collect(Collectors.toList()));
}
@ -709,6 +732,7 @@ public class XmlRecordFactory implements Serializable {
ds
.getSubjects()
.stream()
.filter(Objects::nonNull)
.map(sp -> XmlSerializationUtils.mapStructuredProperty("subjects", sp))
.collect(Collectors.toList()));
}
@ -735,6 +759,7 @@ public class XmlRecordFactory implements Serializable {
o
.getAlternativeNames()
.stream()
.filter(Objects::nonNull)
.map(c -> XmlSerializationUtils.asXmlElement("alternativeNames", c.getValue()))
.collect(Collectors.toList()));
}
@ -862,6 +887,7 @@ public class XmlRecordFactory implements Serializable {
p
.getSubjects()
.stream()
.filter(Objects::nonNull)
.map(sp -> XmlSerializationUtils.mapStructuredProperty("subject", sp))
.collect(Collectors.toList()));
}
@ -912,7 +938,12 @@ public class XmlRecordFactory implements Serializable {
if (p.getFundingtree() != null) {
metadata
.addAll(
p.getFundingtree().stream().map(ft -> ft.getValue()).collect(Collectors.toList()));
p
.getFundingtree()
.stream()
.filter(Objects::nonNull)
.map(ft -> ft.getValue())
.collect(Collectors.toList()));
}
break;
@ -923,6 +954,17 @@ public class XmlRecordFactory implements Serializable {
return metadata;
}
private String getAuthorPidType(String s) {
return XmlSerializationUtils
.escapeXml(s)
.replaceAll("\\W", "")
.replaceAll("\\d", "");
}
private static boolean kvNotBlank(KeyValue kv) {
return kv != null && StringUtils.isNotBlank(kv.getKey()) && StringUtils.isNotBlank(kv.getValue());
}
private void mapDatasourceType(List<String> metadata, final Qualifier dsType) {
metadata.add(XmlSerializationUtils.mapQualifier("datasourcetype", dsType));
@ -960,7 +1002,7 @@ public class XmlRecordFactory implements Serializable {
.add(
XmlSerializationUtils.asXmlElement("coderepositoryurl", re.getCodeRepositoryUrl()));
}
if (re.getResulttype() != null & re.getResulttype().isBlank()) {
if (re.getResulttype() != null && re.getResulttype().isBlank()) {
metadata.add(XmlSerializationUtils.mapQualifier("resulttype", re.getResulttype()));
}
if (re.getCollectedfrom() != null) {
@ -969,6 +1011,7 @@ public class XmlRecordFactory implements Serializable {
re
.getCollectedfrom()
.stream()
.filter(XmlRecordFactory::kvNotBlank)
.map(kv -> XmlSerializationUtils.mapKeyValue("collectedfrom", kv))
.collect(Collectors.toList()));
}
@ -986,10 +1029,10 @@ public class XmlRecordFactory implements Serializable {
if (isNotBlank(re.getOfficialname())) {
metadata.add(XmlSerializationUtils.asXmlElement("officialname", re.getOfficialname()));
}
if (re.getDatasourcetype() != null & !re.getDatasourcetype().isBlank()) {
if (re.getDatasourcetype() != null && !re.getDatasourcetype().isBlank()) {
mapDatasourceType(metadata, re.getDatasourcetype());
}
if (re.getOpenairecompatibility() != null & !re.getOpenairecompatibility().isBlank()) {
if (re.getOpenairecompatibility() != null && !re.getOpenairecompatibility().isBlank()) {
metadata
.add(
XmlSerializationUtils
@ -1006,7 +1049,7 @@ public class XmlRecordFactory implements Serializable {
.add(
XmlSerializationUtils.asXmlElement("legalshortname", re.getLegalshortname()));
}
if (re.getCountry() != null & !re.getCountry().isBlank()) {
if (re.getCountry() != null && !re.getCountry().isBlank()) {
metadata.add(XmlSerializationUtils.mapQualifier("country", re.getCountry()));
}
break;
@ -1020,10 +1063,10 @@ public class XmlRecordFactory implements Serializable {
if (isNotBlank(re.getAcronym())) {
metadata.add(XmlSerializationUtils.asXmlElement("acronym", re.getAcronym()));
}
if (re.getContracttype() != null & !re.getContracttype().isBlank()) {
if (re.getContracttype() != null && !re.getContracttype().isBlank()) {
metadata.add(XmlSerializationUtils.mapQualifier("contracttype", re.getContracttype()));
}
if (re.getFundingtree() != null & contexts != null) {
if (re.getFundingtree() != null && contexts != null) {
metadata
.addAll(
re
@ -1091,12 +1134,12 @@ public class XmlRecordFactory implements Serializable {
.add(
XmlSerializationUtils.mapQualifier("accessright", instance.getAccessright()));
}
if (instance.getCollectedfrom() != null) {
if (instance.getCollectedfrom() != null && kvNotBlank(instance.getCollectedfrom())) {
fields
.add(
XmlSerializationUtils.mapKeyValue("collectedfrom", instance.getCollectedfrom()));
}
if (instance.getHostedby() != null) {
if (instance.getHostedby() != null && kvNotBlank(instance.getHostedby())) {
fields.add(XmlSerializationUtils.mapKeyValue("hostedby", instance.getHostedby()));
}
if (instance.getDateofacceptance() != null

View File

@ -21,6 +21,18 @@
"paramName": "rp",
"paramLongName": "relPartitions",
"paramDescription": "number or partitions for the relations Dataset",
"paramRequired": true
"paramRequired": false
},
{
"paramName": "rf",
"paramLongName": "relationFilter",
"paramDescription": "filter applied reading relations (by relClass)",
"paramRequired": false
},
{
"paramName": "mr",
"paramLongName": "maxRelations",
"paramDescription": "maximum number of relations allowed for a each entity",
"paramRequired": false
}
]

View File

@ -9,6 +9,30 @@
<name>isLookupUrl</name>
<description>URL for the isLookup service</description>
</property>
<property>
<name>relPartitions</name>
<description>number or partitions for the relations Dataset</description>
</property>
<property>
<name>relationFilter</name>
<description>filter applied reading relations (by relClass)</description>
</property>
<property>
<name>maxRelations</name>
<description>maximum number of relations allowed for a each entity</description>
</property>
<property>
<name>otherDsTypeId</name>
<description>mapping used to populate datasourceTypeUi field</description>
</property>
<property>
<name>format</name>
<description>metadata format name (DMF|TMF)</description>
</property>
<property>
<name>batchSize</name>
<description>number of records to be included in each indexing request</description>
</property>
<property>
<name>sparkDriverMemoryForJoining</name>
@ -56,6 +80,10 @@
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
<property>
<name>sparkNetworkTimeout</name>
<description>configures spark.network.timeout</description>
</property>
</parameters>
<global>
@ -69,12 +97,16 @@
</configuration>
</global>
<start to="reuse_records"/>
<start to="resume_from"/>
<decision name="reuse_records">
<decision name="resume_from">
<switch>
<case to="prepare_relations">${wf:conf('reuseRecords') eq false}</case>
<case to="to_solr_index">${wf:conf('reuseRecords') eq true}</case>
<case to="prepare_relations">${wf:conf('resumeFrom') eq 'prepare_relations'}</case>
<case to="fork_join_related_entities">${wf:conf('resumeFrom') eq 'fork_join_related_entities'}</case>
<case to="join_all_entities">${wf:conf('resumeFrom') eq 'join_all_entities'}</case>
<case to="adjancency_lists">${wf:conf('resumeFrom') eq 'adjancency_lists'}</case>
<case to="convert_to_xml">${wf:conf('resumeFrom') eq 'convert_to_xml'}</case>
<case to="to_solr_index">${wf:conf('resumeFrom') eq 'to_solr_index'}</case>
<default to="prepare_relations"/>
</switch>
</decision>
@ -309,7 +341,6 @@
<join name="wait_joins" to="join_all_entities"/>
<action name="join_all_entities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
@ -419,4 +450,5 @@
</action>
<end name="End"/>
</workflow-app>