1
0
Fork 0

reimplementation of the author merging in deduprecord creation. implementation of the test class. minor changes

This commit is contained in:
miconis 2020-05-21 12:02:44 +02:00
commit 8b35e0e7f0
29 changed files with 489 additions and 206 deletions

View File

@ -79,8 +79,6 @@ public class SparkRemoveBlacklistedRelationJob {
Dataset<Relation> inputRelation = readRelations(spark, inputPath);
Dataset<Relation> mergesRelation = readRelations(spark, mergesPath);
log.info("InputRelationCount: {}", inputRelation.count());
Dataset<Relation> dedupSource = blackListed
.joinWith(
mergesRelation, blackListed.col("source").equalTo(mergesRelation.col("target")),
@ -103,11 +101,6 @@ public class SparkRemoveBlacklistedRelationJob {
return c._1();
}, Encoders.bean(Relation.class));
dedupBL
.write()
.mode(SaveMode.Overwrite)
.json(blacklistPath + "/deduped");
inputRelation
.joinWith(
dedupBL, (inputRelation

View File

@ -4,31 +4,48 @@ package eu.dnetlib.dhp.broker.model;
public enum Topic {
// ENRICHMENT MISSING
ENRICH_MISSING_OA_VERSION("ENRICH/MISSING/OPENACCESS_VERSION"), ENRICH_MISSING_ABSTRACT(
"ENRICH/MISSING/ABSTRACT"), ENRICH_MISSING_PUBLICATION_DATE(
"ENRICH/MISSING/PUBLICATION_DATE"), ENRICH_MISSING_PID(
"ENRICH/MISSING/PID"), ENRICH_MISSING_PROJECT("ENRICH/MISSING/PROJECT"), ENRICH_MISSING_SOFTWARE(
"ENRICH/MISSING/SOFTWARE"), ENRICH_MISSING_SUBJECT_MESHEUROPMC(
"ENRICH/MISSING/SUBJECT/MESHEUROPMC"), ENRICH_MISSING_SUBJECT_ARXIV(
"ENRICH/MISSING/SUBJECT/ARXIV"), ENRICH_MISSING_SUBJECT_JEL(
"ENRICH/MISSING/SUBJECT/JEL"), ENRICH_MISSING_SUBJECT_DDC(
"ENRICH/MISSING/SUBJECT/DDC"), ENRICH_MISSING_SUBJECT_ACM(
"ENRICH/MISSING/SUBJECT/ACM"), ENRICH_MISSING_SUBJECT_RVK(
"ENRICH/MISSING/SUBJECT/RVK"), ENRICH_MISSING_AUTHOR_ORCID(
"ENRICH/MISSING/AUTHOR/ORCID"),
ENRICH_MISSING_OA_VERSION("ENRICH/MISSING/OPENACCESS_VERSION"),
ENRICH_MISSING_ABSTRACT("ENRICH/MISSING/ABSTRACT"),
ENRICH_MISSING_PUBLICATION_DATE("ENRICH/MISSING/PUBLICATION_DATE"),
ENRICH_MISSING_PID("ENRICH/MISSING/PID"),
ENRICH_MISSING_PROJECT("ENRICH/MISSING/PROJECT"),
ENRICH_MISSING_SOFTWARE("ENRICH/MISSING/SOFTWARE"),
ENRICH_MISSING_SUBJECT_MESHEUROPMC("ENRICH/MISSING/SUBJECT/MESHEUROPMC"),
ENRICH_MISSING_SUBJECT_ARXIV("ENRICH/MISSING/SUBJECT/ARXIV"),
ENRICH_MISSING_SUBJECT_JEL("ENRICH/MISSING/SUBJECT/JEL"),
ENRICH_MISSING_SUBJECT_DDC("ENRICH/MISSING/SUBJECT/DDC"),
ENRICH_MISSING_SUBJECT_ACM("ENRICH/MISSING/SUBJECT/ACM"),
ENRICH_MISSING_SUBJECT_RVK("ENRICH/MISSING/SUBJECT/RVK"),
ENRICH_MISSING_AUTHOR_ORCID("ENRICH/MISSING/AUTHOR/ORCID"),
// ENRICHMENT MORE
ENRICH_MORE_PID("ENRICH/MORE/PID"), ENRICH_MORE_OA_VERSION("ENRICH/MORE/OPENACCESS_VERSION"), ENRICH_MORE_ABSTRACT(
"ENRICH/MORE/ABSTRACT"), ENRICH_MORE_PUBLICATION_DATE("ENRICH/MORE/PUBLICATION_DATE"), ENRICH_MORE_PROJECT(
"ENRICH/MORE/PROJECT"), ENRICH_MORE_SUBJECT_MESHEUROPMC(
"ENRICH/MORE/SUBJECT/MESHEUROPMC"), ENRICH_MORE_SUBJECT_ARXIV(
"ENRICH/MORE/SUBJECT/ARXIV"), ENRICH_MORE_SUBJECT_JEL(
"ENRICH/MORE/SUBJECT/JEL"), ENRICH_MORE_SUBJECT_DDC(
"ENRICH/MORE/SUBJECT/DDC"), ENRICH_MORE_SUBJECT_ACM(
"ENRICH/MORE/SUBJECT/ACM"), ENRICH_MORE_SUBJECT_RVK("ENRICH/MORE/SUBJECT/RVK"),
ENRICH_MORE_PID("ENRICH/MORE/PID"),
ENRICH_MORE_OA_VERSION("ENRICH/MORE/OPENACCESS_VERSION"),
ENRICH_MORE_ABSTRACT("ENRICH/MORE/ABSTRACT"),
ENRICH_MORE_PUBLICATION_DATE("ENRICH/MORE/PUBLICATION_DATE"),
ENRICH_MORE_PROJECT("ENRICH/MORE/PROJECT"),
ENRICH_MORE_SUBJECT_MESHEUROPMC("ENRICH/MORE/SUBJECT/MESHEUROPMC"),
ENRICH_MORE_SUBJECT_ARXIV("ENRICH/MORE/SUBJECT/ARXIV"),
ENRICH_MORE_SUBJECT_JEL("ENRICH/MORE/SUBJECT/JEL"),
ENRICH_MORE_SUBJECT_DDC("ENRICH/MORE/SUBJECT/DDC"),
ENRICH_MORE_SUBJECT_ACM("ENRICH/MORE/SUBJECT/ACM"),
ENRICH_MORE_SUBJECT_RVK("ENRICH/MORE/SUBJECT/RVK"),
// ADDITION
ADD_BY_PROJECT("ADD/BY_PROJECT");
ADD_BY_PROJECT("ADD/BY_PROJECT"),
// OTHER RELS
ENRICH_MISSING_PUBLICATION_IS_RELATED_TO("ENRICH/MISSING/PUBLICATION/IS_RELATED_TO"),
ENRICH_MISSING_PUBLICATION_REFERENCES("ENRICH/MISSING/PUBLICATION/REFERENCES"),
ENRICH_MISSING_PUBLICATION_IS_REFERENCED_BY("ENRICH/MISSING/PUBLICATION/IS_REFERENCED_BY"),
ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_TO("ENRICH/MISSING/PUBLICATION/IS_SUPPLEMENTED_TO"),
ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_BY("ENRICH/MISSING/PUBLICATION/IS_SUPPLEMENTED_BY"),
ENRICH_MISSING_DATASET_IS_RELATED_TO("ENRICH/MISSING/DATASET/IS_RELATED_TO"),
ENRICH_MISSING_DATASET_REFERENCES("ENRICH/MISSING/DATASET/REFERENCES"),
ENRICH_MISSING_DATASET_IS_REFERENCED_BY("ENRICH/MISSING/DATASET/IS_REFERENCED_BY"),
ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_TO("ENRICH/MISSING/DATASET/IS_SUPPLEMENTED_TO"),
ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_BY("ENRICH/MISSING/DATASET/IS_SUPPLEMENTED_BY");
Topic(final String path) {
this.path = path;
@ -42,9 +59,7 @@ public enum Topic {
public static Topic fromPath(final String path) {
for (final Topic t : Topic.values()) {
if (t.getPath().equals(path)) {
return t;
}
if (t.getPath().equals(path)) { return t; }
}
return null;
}

View File

@ -9,28 +9,42 @@ import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.model.EventFactory;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingAbstract;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingAuthorOrcid;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingOpenAccess;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingPid;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingProject;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingPublicationDate;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingSubject;
import eu.dnetlib.dhp.broker.oa.util.EnrichMoreOpenAccess;
import eu.dnetlib.dhp.broker.oa.util.EnrichMorePid;
import eu.dnetlib.dhp.broker.oa.util.EnrichMoreSubject;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingAbstract;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingAuthorOrcid;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingOpenAccess;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingPid;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingProject;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingPublicationDate;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingSubject;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMoreOpenAccess;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMorePid;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMoreSubject;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.broker.oa.util.UpdateMatcher;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
public class GenerateEventsApplication {
@ -47,12 +61,13 @@ public class GenerateEventsApplication {
private static final UpdateMatcher<?> enrichMorePid = new EnrichMorePid();
private static final UpdateMatcher<?> enrichMoreSubject = new EnrichMoreSubject();
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
GenerateEventsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json")));
.toString(GenerateEventsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
@ -67,10 +82,23 @@ public class GenerateEventsApplication {
final String eventsPath = parser.get("eventsPath");
log.info("eventsPath: {}", eventsPath);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
removeOutputDir(spark, eventsPath);
generateEvents(spark, graphPath, eventsPath);
final JavaRDD<Event> eventsRdd = sc.emptyRDD();
eventsRdd.union(generateSimpleEvents(spark, graphPath, Publication.class));
eventsRdd.union(generateSimpleEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class));
eventsRdd.union(generateSimpleEvents(spark, graphPath, Software.class));
eventsRdd.union(generateSimpleEvents(spark, graphPath, OtherResearchProduct.class));
eventsRdd.saveAsTextFile(eventsPath, GzipCodec.class);
});
}
@ -79,11 +107,34 @@ public class GenerateEventsApplication {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
private static void generateEvents(final SparkSession spark, final String graphPath, final String eventsPath) {
// TODO
private static <R extends Result> JavaRDD<Event> generateSimpleEvents(final SparkSession spark,
final String graphPath,
final Class<R> resultClazz) {
final Dataset<R> results =
readPath(spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), resultClazz)
.filter(r -> r.getDataInfo().getDeletedbyinference());
final Dataset<Relation> rels =
readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelClass().equals("TODO")); // TODO mergedIN
final Column c = null; // TODO
final Dataset<Row> aa = results.joinWith(rels, results.col("id").equalTo(rels.col("source")), "inner")
.groupBy(rels.col("target"))
.agg(c)
.filter(x -> x.size() > 1)
// generateSimpleEvents(...)
// flatMap()
// toRdd()
;
return null;
}
private List<Event> generateEvents(final Result... children) {
private List<Event> generateSimpleEvents(final Result... children) {
final List<UpdateInfo<?>> list = new ArrayList<>();
for (final Result target : children) {
@ -102,4 +153,13 @@ public class GenerateEventsApplication {
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
}
public static <R> Dataset<R> readPath(
final SparkSession spark,
final String inputPath,
final Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
}

View File

@ -1,11 +1,12 @@
package eu.dnetlib.dhp.broker.oa.util;
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingAbstract extends UpdateMatcher<String> {

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.broker.oa.util;
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
@ -7,6 +7,7 @@ import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingAuthorOrcid extends UpdateMatcher<Pair<String, String>> {

View File

@ -0,0 +1,55 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.Instance;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingOpenAccess extends UpdateMatcher<Instance> {
public EnrichMissingOpenAccess() {
super(true);
}
@Override
protected List<UpdateInfo<Instance>> findUpdates(final Result source, final Result target) {
final long count = target
.getInstance()
.stream()
.map(i -> i.getAccessright().getClassid())
.filter(right -> right.equals(BrokerConstants.OPEN_ACCESS))
.count();
if (count > 0) {
return Arrays.asList();
}
return source
.getInstance()
.stream()
.filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
.map(ConversionUtils::oafInstanceToBrokerInstances)
.flatMap(s -> s)
.map(i -> generateUpdateInfo(i, source, target))
.collect(Collectors.toList());
}
@Override
public UpdateInfo<Instance> generateUpdateInfo(final Instance highlightValue,
final Result source,
final Result target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_OA_VERSION,
highlightValue, source, target,
(p, i) -> p.getInstances().add(i),
Instance::getUrl);
}
}

View File

@ -1,11 +1,14 @@
package eu.dnetlib.dhp.broker.oa.util;
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.Pid;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingPid extends UpdateMatcher<Pid> {
@ -16,8 +19,18 @@ public class EnrichMissingPid extends UpdateMatcher<Pid> {
@Override
protected List<UpdateInfo<Pid>> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
final long count = target.getPid().size();
if (count > 0) {
return Arrays.asList();
}
return source
.getPid()
.stream()
.map(ConversionUtils::oafPidToBrokerPid)
.map(i -> generateUpdateInfo(i, source, target))
.collect(Collectors.toList());
}
@Override

View File

@ -1,11 +1,12 @@
package eu.dnetlib.dhp.broker.oa.util;
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.Project;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingProject extends UpdateMatcher<Project> {
@ -21,7 +22,8 @@ public class EnrichMissingProject extends UpdateMatcher<Project> {
}
@Override
public UpdateInfo<Project> generateUpdateInfo(final Project highlightValue, final Result source,
public UpdateInfo<Project> generateUpdateInfo(final Project highlightValue,
final Result source,
final Result target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_PROJECT,

View File

@ -1,10 +1,12 @@
package eu.dnetlib.dhp.broker.oa.util;
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingPublicationDate extends UpdateMatcher<String> {
@ -15,12 +17,15 @@ public class EnrichMissingPublicationDate extends UpdateMatcher<String> {
@Override
protected List<UpdateInfo<String>> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
if (isMissing(target.getDateofacceptance()) && !isMissing(source.getDateofacceptance())) {
return Arrays.asList(generateUpdateInfo(source.getDateofacceptance().getValue(), source, target));
}
return new ArrayList<>();
}
@Override
public UpdateInfo<String> generateUpdateInfo(final String highlightValue, final Result source,
public UpdateInfo<String> generateUpdateInfo(final String highlightValue,
final Result source,
final Result target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_PUBLICATION_DATE,

View File

@ -0,0 +1,53 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class EnrichMissingSubject extends UpdateMatcher<Pair<String, String>> {
public EnrichMissingSubject() {
super(true);
}
@Override
protected List<UpdateInfo<Pair<String, String>>> findUpdates(final Result source, final Result target) {
final Set<String> existingTypes = target
.getSubject()
.stream()
.map(StructuredProperty::getQualifier)
.map(Qualifier::getClassid)
.collect(Collectors.toSet());
return source
.getPid()
.stream()
.filter(pid -> !existingTypes.contains(pid.getQualifier().getClassid()))
.map(ConversionUtils::oafSubjectToPair)
.map(i -> generateUpdateInfo(i, source, target))
.collect(Collectors.toList());
}
@Override
public UpdateInfo<Pair<String, String>> generateUpdateInfo(final Pair<String, String> highlightValue,
final Result source,
final Result target) {
return new UpdateInfo<>(
Topic.fromPath("ENRICH/MISSING/SUBJECT/" + highlightValue.getLeft()),
highlightValue, source, target,
(p, pair) -> p.getSubjects().add(pair.getRight()),
pair -> pair.getLeft() + "::" + pair.getRight());
}
}

View File

@ -0,0 +1,53 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.Instance;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMoreOpenAccess extends UpdateMatcher<Instance> {
public EnrichMoreOpenAccess() {
super(true);
}
@Override
protected List<UpdateInfo<Instance>> findUpdates(final Result source, final Result target) {
final Set<String> urls = target
.getInstance()
.stream()
.filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
.map(i -> i.getUrl())
.flatMap(List::stream)
.collect(Collectors.toSet());
return source
.getInstance()
.stream()
.filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
.map(ConversionUtils::oafInstanceToBrokerInstances)
.flatMap(s -> s)
.filter(i -> !urls.contains(i.getUrl()))
.map(i -> generateUpdateInfo(i, source, target))
.collect(Collectors.toList());
}
@Override
public UpdateInfo<Instance> generateUpdateInfo(final Instance highlightValue,
final Result source,
final Result target) {
return new UpdateInfo<>(
Topic.ENRICH_MORE_OA_VERSION,
highlightValue, source, target,
(p, i) -> p.getInstances().add(i),
Instance::getUrl);
}
}

View File

@ -1,11 +1,14 @@
package eu.dnetlib.dhp.broker.oa.util;
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.Pid;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMorePid extends UpdateMatcher<Pid> {
@ -16,8 +19,19 @@ public class EnrichMorePid extends UpdateMatcher<Pid> {
@Override
protected List<UpdateInfo<Pid>> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
final Set<String> existingPids = target
.getPid()
.stream()
.map(pid -> pid.getQualifier().getClassid() + "::" + pid.getValue())
.collect(Collectors.toSet());
return source
.getPid()
.stream()
.filter(pid -> !existingPids.contains(pid.getQualifier().getClassid() + "::" + pid.getValue()))
.map(ConversionUtils::oafPidToBrokerPid)
.map(i -> generateUpdateInfo(i, source, target))
.collect(Collectors.toList());
}
@Override

View File

@ -1,12 +1,15 @@
package eu.dnetlib.dhp.broker.oa.util;
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMoreSubject extends UpdateMatcher<Pair<String, String>> {
@ -17,18 +20,25 @@ public class EnrichMoreSubject extends UpdateMatcher<Pair<String, String>> {
@Override
protected List<UpdateInfo<Pair<String, String>>> findUpdates(final Result source, final Result target) {
// MESHEUROPMC
// ARXIV
// JEL
// DDC
// ACM
final Set<String> existingSubjects = target
.getSubject()
.stream()
.map(pid -> pid.getQualifier().getClassid() + "::" + pid.getValue())
.collect(Collectors.toSet());
return Arrays.asList();
return source
.getPid()
.stream()
.filter(pid -> !existingSubjects.contains(pid.getQualifier().getClassid() + "::" + pid.getValue()))
.map(ConversionUtils::oafSubjectToPair)
.map(i -> generateUpdateInfo(i, source, target))
.collect(Collectors.toList());
}
@Override
public UpdateInfo<Pair<String, String>> generateUpdateInfo(final Pair<String, String> highlightValue,
final Result source, final Result target) {
final Result source,
final Result target) {
return new UpdateInfo<>(
Topic.fromPath("ENRICH/MORE/SUBJECT/" + highlightValue.getLeft()),

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.broker.oa.util;
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.Collection;
@ -10,6 +10,7 @@ import java.util.Map;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Result;
@ -29,8 +30,7 @@ public abstract class UpdateMatcher<T> {
if (source != res) {
for (final UpdateInfo<T> info : findUpdates(source, res)) {
final String s = DigestUtils.md5Hex(info.getHighlightValueAsString());
if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {
} else {
if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {} else {
infoMap.put(s, info);
}
}
@ -53,11 +53,16 @@ public abstract class UpdateMatcher<T> {
protected abstract List<UpdateInfo<T>> findUpdates(Result source, Result target);
protected abstract UpdateInfo<T> generateUpdateInfo(final T highlightValue, final Result source,
protected abstract UpdateInfo<T> generateUpdateInfo(final T highlightValue,
final Result source,
final Result target);
protected static boolean isMissing(final List<Field<String>> list) {
return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0).getValue());
}
protected boolean isMissing(final Field<String> field) {
return field == null || StringUtils.isBlank(field.getValue());
}
}

View File

@ -0,0 +1,7 @@
package eu.dnetlib.dhp.broker.oa.util;
public class BrokerConstants {
public final static String OPEN_ACCESS = "OPEN";
}

View File

@ -0,0 +1,36 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.broker.objects.Instance;
import eu.dnetlib.broker.objects.Pid;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class ConversionUtils {
public static Stream<Instance> oafInstanceToBrokerInstances(final eu.dnetlib.dhp.schema.oaf.Instance i) {
return i.getUrl().stream().map(url -> {
final Instance r = new Instance();
r.setUrl(url);
r.setInstancetype(i.getInstancetype().getClassid());
r.setLicense(BrokerConstants.OPEN_ACCESS);
r.setHostedby(i.getHostedby().getValue());
return r;
});
}
public static Pid oafPidToBrokerPid(final StructuredProperty sp) {
final Pid pid = new Pid();
pid.setValue(sp.getValue());
pid.setType(sp.getQualifier().getClassid());
return pid;
}
public static final Pair<String, String> oafSubjectToPair(final StructuredProperty sp) {
return Pair.of(sp.getQualifier().getClassid(), sp.getValue());
}
}

View File

@ -1,33 +0,0 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.Instance;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingOpenAccess extends UpdateMatcher<Instance> {
public EnrichMissingOpenAccess() {
super(true);
}
@Override
protected List<UpdateInfo<Instance>> findUpdates(final Result source, final Result target) {
return Arrays.asList();
}
@Override
public UpdateInfo<Instance> generateUpdateInfo(final Instance highlightValue, final Result source,
final Result target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_OA_VERSION,
highlightValue, source, target,
(p, i) -> p.getInstances().add(i),
Instance::getUrl);
}
}

View File

@ -1,40 +0,0 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingSubject extends UpdateMatcher<Pair<String, String>> {
public EnrichMissingSubject() {
super(true);
}
@Override
protected List<UpdateInfo<Pair<String, String>>> findUpdates(final Result source, final Result target) {
// MESHEUROPMC
// ARXIV
// JEL
// DDC
// ACM
return Arrays.asList();
}
@Override
public UpdateInfo<Pair<String, String>> generateUpdateInfo(final Pair<String, String> highlightValue,
final Result source, final Result target) {
return new UpdateInfo<>(
Topic.fromPath("ENRICH/MISSING/SUBJECT/" + highlightValue.getLeft()),
highlightValue, source, target,
(p, pair) -> p.getSubjects().add(pair.getRight()),
pair -> pair.getLeft() + "::" + pair.getRight());
}
}

View File

@ -1,33 +0,0 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.Instance;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMoreOpenAccess extends UpdateMatcher<Instance> {
public EnrichMoreOpenAccess() {
super(true);
}
@Override
protected List<UpdateInfo<Instance>> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
@Override
public UpdateInfo<Instance> generateUpdateInfo(final Instance highlightValue, final Result source,
final Result target) {
return new UpdateInfo<>(
Topic.ENRICH_MORE_OA_VERSION,
highlightValue, source, target,
(p, i) -> p.getInstances().add(i),
Instance::getUrl);
}
}

View File

@ -25,7 +25,7 @@ public final class UpdateInfo<T> {
private final float trust;
protected UpdateInfo(final Topic topic, final T highlightValue, final Result source, final Result target,
public UpdateInfo(final Topic topic, final T highlightValue, final Result source, final Result target,
final BiConsumer<Publication, T> compileHighlight,
final Function<T, String> highlightToString) {
this.topic = topic;

View File

@ -53,9 +53,7 @@ public class PrepareResultCommunitySetStep2 {
conf,
isSparkSessionManaged,
spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath);
}
removeOutputDir(spark, outputPath);
mergeInfo(spark, inputPath, outputPath);
});
}

View File

@ -84,7 +84,7 @@ public class SparkResultToOrganizationFromIstRepoJob {
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
// removeOutputDir(spark, outputPath);
if (saveGraph) {
execPropagation(
spark,

View File

@ -1,8 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import java.util.*;
@ -10,11 +9,13 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.Node;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.oa.graph.raw.common.PacePerson;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
public class OafToOafMapper extends AbstractMdRecordToOafMapper {
@ -28,15 +29,26 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
final List<Author> res = new ArrayList<>();
int pos = 1;
for (final Object o : doc.selectNodes("//dc:creator")) {
final Node n = (Node) o;
final Element e = (Element) o;
final Author author = new Author();
author.setFullname(n.getText());
author.setFullname(e.getText());
author.setRank(pos++);
final PacePerson p = new PacePerson(n.getText(), false);
final PacePerson p = new PacePerson(e.getText(), false);
if (p.isAccurate()) {
author.setName(p.getNormalisedFirstName());
author.setSurname(p.getNormalisedSurname());
}
final String pid = e.attributeValue("nameIdentifier");
final String pidType = e.attributeValue("nameIdentifierScheme");
author.setPid(new ArrayList<>());
if (StringUtils.isNotBlank(pid) && StringUtils.isNotBlank(pidType)) {
author
.getPid()
.add(structuredProperty(pid, qualifier(pidType, pidType, DNET_PID_TYPES, DNET_PID_TYPES), info));
}
res.add(author);
}
return res;

View File

@ -63,17 +63,17 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
author.setSurname(surname);
}
author.setAffiliation(prepareListFields(doc, "./datacite:affiliation", info));
author.setPid(preparePids(doc, info));
author.setAffiliation(prepareListFields(n, "./datacite:affiliation", info));
author.setPid(preparePids(n, info));
author.setRank(pos++);
res.add(author);
}
return res;
}
private List<StructuredProperty> preparePids(final Document doc, final DataInfo info) {
private List<StructuredProperty> preparePids(final Node n, final DataInfo info) {
final List<StructuredProperty> res = new ArrayList<>();
for (final Object o : doc.selectNodes("./datacite:nameIdentifier")) {
for (final Object o : n.selectNodes("./datacite:nameIdentifier")) {
res
.add(
structuredProperty(

View File

@ -22,12 +22,13 @@ SELECT
'' AS inferenceprovenance,
d.id AS collectedfromid,
d.officialname AS collectedfromname,
o.country || '@@@' || o.country || '@@@dnet:countries@@@dnet:countries' AS country,
o.country || '@@@' || COALESCE(cntr.name,o.country) || '@@@dnet:countries@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' AS provenanceaction,
ARRAY[]::text[] AS pid
FROM dsm_organizations o
LEFT OUTER JOIN dsm_datasources d ON (d.id = o.collectedfrom)
LEFT OUTER JOIN class cntr ON (cntr.code = o.country)

View File

@ -10,6 +10,7 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
@ -19,11 +20,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
@ExtendWith(MockitoExtension.class)
public class MappersTest {
@ -54,7 +52,29 @@ public class MappersTest {
assertValidId(p.getId());
assertValidId(p.getCollectedfrom().get(0).getKey());
assertTrue(StringUtils.isNotBlank(p.getTitle().get(0).getValue()));
assertTrue(p.getAuthor().size() > 0);
Optional<Author> author = p
.getAuthor()
.stream()
.filter(a -> a.getPid() != null && !a.getPid().isEmpty())
.findFirst();
assertTrue(author.isPresent());
StructuredProperty pid = author
.get()
.getPid()
.stream()
.findFirst()
.get();
assertEquals("0000-0001-6651-1178", pid.getValue());
assertEquals("ORCID", pid.getQualifier().getClassid());
assertEquals("ORCID", pid.getQualifier().getClassname());
assertEquals(ModelConstants.DNET_PID_TYPES, pid.getQualifier().getSchemeid());
assertEquals(ModelConstants.DNET_PID_TYPES, pid.getQualifier().getSchemename());
assertEquals("Votsi,Nefta", author.get().getFullname());
assertEquals("Votsi", author.get().getSurname());
assertEquals("Nefta", author.get().getName());
assertTrue(p.getSubject().size() > 0);
assertTrue(StringUtils.isNotBlank(p.getJournal().getIssnOnline()));
assertTrue(StringUtils.isNotBlank(p.getJournal().getName()));
@ -100,6 +120,38 @@ public class MappersTest {
assertValidId(d.getCollectedfrom().get(0).getKey());
assertTrue(StringUtils.isNotBlank(d.getTitle().get(0).getValue()));
assertTrue(d.getAuthor().size() > 0);
Optional<Author> author = d
.getAuthor()
.stream()
.filter(a -> a.getPid() != null && !a.getPid().isEmpty())
.findFirst();
assertTrue(author.isPresent());
StructuredProperty pid = author
.get()
.getPid()
.stream()
.findFirst()
.get();
assertEquals("0000-0001-9074-1619", pid.getValue());
assertEquals("ORCID", pid.getQualifier().getClassid());
assertEquals("ORCID", pid.getQualifier().getClassname());
assertEquals(ModelConstants.DNET_PID_TYPES, pid.getQualifier().getSchemeid());
assertEquals(ModelConstants.DNET_PID_TYPES, pid.getQualifier().getSchemename());
assertEquals("Baracchini, Theo", author.get().getFullname());
assertEquals("Baracchini", author.get().getSurname());
assertEquals("Theo", author.get().getName());
assertEquals(1, author.get().getAffiliation().size());
Optional<Field<String>> opAff = author
.get()
.getAffiliation()
.stream()
.findFirst();
assertTrue(opAff.isPresent());
Field<String> affiliation = opAff.get();
assertEquals("ISTI-CNR", affiliation.getValue());
assertTrue(d.getSubject().size() > 0);
assertTrue(d.getInstance().size() > 0);
assertTrue(d.getContext().size() > 0);

View File

@ -19,7 +19,7 @@
<metadata xmlns="http://namespace.openaire.eu/">
<dc:title>Ecosystem Service capacity is higher in areas of multiple designation types</dc:title>
<dc:creator>Nikolaidou,Charitini</dc:creator>
<dc:creator>Votsi,Nefta</dc:creator>
<dc:creator nameIdentifier="0000-0001-6651-1178" nameIdentifierScheme="ORCID">Votsi,Nefta</dc:creator>
<dc:creator>Sgardelis,Steanos</dc:creator>
<dc:creator>Halley,John</dc:creator>
<dc:creator>Pantis,John</dc:creator>

View File

@ -35,9 +35,10 @@
</creator>
<creator>
<creatorName>Baracchini, Theo</creatorName>
<nameIdentifier nameIdentifierScheme="ORCID">0000-0001-9074-1619</nameIdentifier>
<givenName>Theo</givenName>
<familyName>Baracchini</familyName>
<affiliation>Physics of Aquatic Systems Laboratory (APHYS) Margaretha Kamprad Chair, ENAC, EPFL, Lausanne, 1015, Switzerland</affiliation>
<affiliation>ISTI-CNR</affiliation>
</creator>
<creator>
<creatorName>Wüest, Alfred</creatorName>

View File

@ -521,6 +521,8 @@
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/enrichment/country/oozie_app',
'sparkExecutorCores' : '3',
'sparkExecutorMemory' : '10G',
'workingDir' : '/tmp/beta_provision/working_dir/country',
'allowedtypes' : 'pubsrepository::institutional',
'whitelist' : '10|opendoar____::300891a62162b960cf02ce3827bb363c',