enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
9 changed files with 126 additions and 49 deletions
Showing only changes of commit 5e0e554000 - Show all commits

View File

@ -79,8 +79,6 @@ public class SparkRemoveBlacklistedRelationJob {
Dataset<Relation> inputRelation = readRelations(spark, inputPath);
Dataset<Relation> mergesRelation = readRelations(spark, mergesPath);
log.info("InputRelationCount: {}", inputRelation.count());
Dataset<Relation> dedupSource = blackListed
.joinWith(
mergesRelation, blackListed.col("source").equalTo(mergesRelation.col("target")),
@ -103,11 +101,6 @@ public class SparkRemoveBlacklistedRelationJob {
return c._1();
}, Encoders.bean(Relation.class));
dedupBL
.write()
.mode(SaveMode.Overwrite)
.json(blacklistPath + "/deduped");
inputRelation
.joinWith(
dedupBL, (inputRelation

View File

@ -4,31 +4,48 @@ package eu.dnetlib.dhp.broker.model;
public enum Topic {
// ENRICHMENT MISSING
ENRICH_MISSING_OA_VERSION("ENRICH/MISSING/OPENACCESS_VERSION"), ENRICH_MISSING_ABSTRACT(
"ENRICH/MISSING/ABSTRACT"), ENRICH_MISSING_PUBLICATION_DATE(
"ENRICH/MISSING/PUBLICATION_DATE"), ENRICH_MISSING_PID(
"ENRICH/MISSING/PID"), ENRICH_MISSING_PROJECT("ENRICH/MISSING/PROJECT"), ENRICH_MISSING_SOFTWARE(
"ENRICH/MISSING/SOFTWARE"), ENRICH_MISSING_SUBJECT_MESHEUROPMC(
"ENRICH/MISSING/SUBJECT/MESHEUROPMC"), ENRICH_MISSING_SUBJECT_ARXIV(
"ENRICH/MISSING/SUBJECT/ARXIV"), ENRICH_MISSING_SUBJECT_JEL(
"ENRICH/MISSING/SUBJECT/JEL"), ENRICH_MISSING_SUBJECT_DDC(
"ENRICH/MISSING/SUBJECT/DDC"), ENRICH_MISSING_SUBJECT_ACM(
"ENRICH/MISSING/SUBJECT/ACM"), ENRICH_MISSING_SUBJECT_RVK(
"ENRICH/MISSING/SUBJECT/RVK"), ENRICH_MISSING_AUTHOR_ORCID(
"ENRICH/MISSING/AUTHOR/ORCID"),
ENRICH_MISSING_OA_VERSION("ENRICH/MISSING/OPENACCESS_VERSION"),
ENRICH_MISSING_ABSTRACT("ENRICH/MISSING/ABSTRACT"),
ENRICH_MISSING_PUBLICATION_DATE("ENRICH/MISSING/PUBLICATION_DATE"),
ENRICH_MISSING_PID("ENRICH/MISSING/PID"),
ENRICH_MISSING_PROJECT("ENRICH/MISSING/PROJECT"),
ENRICH_MISSING_SOFTWARE("ENRICH/MISSING/SOFTWARE"),
ENRICH_MISSING_SUBJECT_MESHEUROPMC("ENRICH/MISSING/SUBJECT/MESHEUROPMC"),
ENRICH_MISSING_SUBJECT_ARXIV("ENRICH/MISSING/SUBJECT/ARXIV"),
ENRICH_MISSING_SUBJECT_JEL("ENRICH/MISSING/SUBJECT/JEL"),
ENRICH_MISSING_SUBJECT_DDC("ENRICH/MISSING/SUBJECT/DDC"),
ENRICH_MISSING_SUBJECT_ACM("ENRICH/MISSING/SUBJECT/ACM"),
ENRICH_MISSING_SUBJECT_RVK("ENRICH/MISSING/SUBJECT/RVK"),
ENRICH_MISSING_AUTHOR_ORCID("ENRICH/MISSING/AUTHOR/ORCID"),
// ENRICHMENT MORE
ENRICH_MORE_PID("ENRICH/MORE/PID"), ENRICH_MORE_OA_VERSION("ENRICH/MORE/OPENACCESS_VERSION"), ENRICH_MORE_ABSTRACT(
"ENRICH/MORE/ABSTRACT"), ENRICH_MORE_PUBLICATION_DATE("ENRICH/MORE/PUBLICATION_DATE"), ENRICH_MORE_PROJECT(
"ENRICH/MORE/PROJECT"), ENRICH_MORE_SUBJECT_MESHEUROPMC(
"ENRICH/MORE/SUBJECT/MESHEUROPMC"), ENRICH_MORE_SUBJECT_ARXIV(
"ENRICH/MORE/SUBJECT/ARXIV"), ENRICH_MORE_SUBJECT_JEL(
"ENRICH/MORE/SUBJECT/JEL"), ENRICH_MORE_SUBJECT_DDC(
"ENRICH/MORE/SUBJECT/DDC"), ENRICH_MORE_SUBJECT_ACM(
"ENRICH/MORE/SUBJECT/ACM"), ENRICH_MORE_SUBJECT_RVK("ENRICH/MORE/SUBJECT/RVK"),
ENRICH_MORE_PID("ENRICH/MORE/PID"),
ENRICH_MORE_OA_VERSION("ENRICH/MORE/OPENACCESS_VERSION"),
ENRICH_MORE_ABSTRACT("ENRICH/MORE/ABSTRACT"),
ENRICH_MORE_PUBLICATION_DATE("ENRICH/MORE/PUBLICATION_DATE"),
ENRICH_MORE_PROJECT("ENRICH/MORE/PROJECT"),
ENRICH_MORE_SUBJECT_MESHEUROPMC("ENRICH/MORE/SUBJECT/MESHEUROPMC"),
ENRICH_MORE_SUBJECT_ARXIV("ENRICH/MORE/SUBJECT/ARXIV"),
ENRICH_MORE_SUBJECT_JEL("ENRICH/MORE/SUBJECT/JEL"),
ENRICH_MORE_SUBJECT_DDC("ENRICH/MORE/SUBJECT/DDC"),
ENRICH_MORE_SUBJECT_ACM("ENRICH/MORE/SUBJECT/ACM"),
ENRICH_MORE_SUBJECT_RVK("ENRICH/MORE/SUBJECT/RVK"),
// ADDITION
ADD_BY_PROJECT("ADD/BY_PROJECT");
ADD_BY_PROJECT("ADD/BY_PROJECT"),
// OTHER RELS
ENRICH_MISSING_PUBLICATION_IS_RELATED_TO("ENRICH/MISSING/PUBLICATION/IS_RELATED_TO"),
ENRICH_MISSING_PUBLICATION_REFERENCES("ENRICH/MISSING/PUBLICATION/REFERENCES"),
ENRICH_MISSING_PUBLICATION_IS_REFERENCED_BY("ENRICH/MISSING/PUBLICATION/IS_REFERENCED_BY"),
ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_TO("ENRICH/MISSING/PUBLICATION/IS_SUPPLEMENTED_TO"),
ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_BY("ENRICH/MISSING/PUBLICATION/IS_SUPPLEMENTED_BY"),
ENRICH_MISSING_DATASET_IS_RELATED_TO("ENRICH/MISSING/DATASET/IS_RELATED_TO"),
ENRICH_MISSING_DATASET_REFERENCES("ENRICH/MISSING/DATASET/REFERENCES"),
ENRICH_MISSING_DATASET_IS_REFERENCED_BY("ENRICH/MISSING/DATASET/IS_REFERENCED_BY"),
ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_TO("ENRICH/MISSING/DATASET/IS_SUPPLEMENTED_TO"),
ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_BY("ENRICH/MISSING/DATASET/IS_SUPPLEMENTED_BY");
Topic(final String path) {
this.path = path;
@ -42,9 +59,7 @@ public enum Topic {
public static Topic fromPath(final String path) {
for (final Topic t : Topic.values()) {
if (t.getPath().equals(path)) {
return t;
}
if (t.getPath().equals(path)) { return t; }
}
return null;
}

View File

@ -9,11 +9,21 @@ import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.model.EventFactory;
@ -30,7 +40,11 @@ import eu.dnetlib.dhp.broker.oa.matchers.EnrichMoreSubject;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
public class GenerateEventsApplication {
@ -47,11 +61,12 @@ public class GenerateEventsApplication {
private static final UpdateMatcher<?> enrichMorePid = new EnrichMorePid();
private static final UpdateMatcher<?> enrichMoreSubject = new EnrichMoreSubject();
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
GenerateEventsApplication.class
.toString(GenerateEventsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json")));
parser.parseArgument(args);
@ -67,10 +82,23 @@ public class GenerateEventsApplication {
final String eventsPath = parser.get("eventsPath");
log.info("eventsPath: {}", eventsPath);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
removeOutputDir(spark, eventsPath);
generateEvents(spark, graphPath, eventsPath);
final JavaRDD<Event> eventsRdd = sc.emptyRDD();
eventsRdd.union(generateSimpleEvents(spark, graphPath, Publication.class));
eventsRdd.union(generateSimpleEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class));
eventsRdd.union(generateSimpleEvents(spark, graphPath, Software.class));
eventsRdd.union(generateSimpleEvents(spark, graphPath, OtherResearchProduct.class));
eventsRdd.saveAsTextFile(eventsPath, GzipCodec.class);
});
}
@ -79,11 +107,34 @@ public class GenerateEventsApplication {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
private static void generateEvents(final SparkSession spark, final String graphPath, final String eventsPath) {
// TODO
private static <R extends Result> JavaRDD<Event> generateSimpleEvents(final SparkSession spark,
final String graphPath,
final Class<R> resultClazz) {
final Dataset<R> results =
readPath(spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), resultClazz)
.filter(r -> r.getDataInfo().getDeletedbyinference());
final Dataset<Relation> rels =
readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelClass().equals("TODO")); // TODO mergedIN
final Column c = null; // TODO
final Dataset<Row> aa = results.joinWith(rels, results.col("id").equalTo(rels.col("source")), "inner")
.groupBy(rels.col("target"))
.agg(c)
.filter(x -> x.size() > 1)
// generateSimpleEvents(...)
// flatMap()
// toRdd()
;
return null;
}
private List<Event> generateEvents(final Result... children) {
private List<Event> generateSimpleEvents(final Result... children) {
final List<UpdateInfo<?>> list = new ArrayList<>();
for (final Result target : children) {
@ -102,4 +153,13 @@ public class GenerateEventsApplication {
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
}
public static <R> Dataset<R> readPath(
final SparkSession spark,
final String inputPath,
final Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
}

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -16,12 +17,15 @@ public class EnrichMissingPublicationDate extends UpdateMatcher<String> {
@Override
protected List<UpdateInfo<String>> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
if (isMissing(target.getDateofacceptance()) && !isMissing(source.getDateofacceptance())) {
return Arrays.asList(generateUpdateInfo(source.getDateofacceptance().getValue(), source, target));
}
return new ArrayList<>();
}
@Override
public UpdateInfo<String> generateUpdateInfo(final String highlightValue, final Result source,
public UpdateInfo<String> generateUpdateInfo(final String highlightValue,
final Result source,
final Result target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_PUBLICATION_DATE,

View File

@ -30,8 +30,7 @@ public abstract class UpdateMatcher<T> {
if (source != res) {
for (final UpdateInfo<T> info : findUpdates(source, res)) {
final String s = DigestUtils.md5Hex(info.getHighlightValueAsString());
if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {
} else {
if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {} else {
infoMap.put(s, info);
}
}
@ -54,11 +53,16 @@ public abstract class UpdateMatcher<T> {
protected abstract List<UpdateInfo<T>> findUpdates(Result source, Result target);
protected abstract UpdateInfo<T> generateUpdateInfo(final T highlightValue, final Result source,
protected abstract UpdateInfo<T> generateUpdateInfo(final T highlightValue,
final Result source,
final Result target);
protected static boolean isMissing(final List<Field<String>> list) {
return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0).getValue());
}
protected boolean isMissing(final Field<String> field) {
return field == null || StringUtils.isBlank(field.getValue());
}
}

View File

@ -129,6 +129,9 @@ public class DedupUtility {
.max(Comparator.comparing(Tuple2::_1));
if (simAuhtor.isPresent() && simAuhtor.get()._1() > THRESHOLD) {
Author r = simAuhtor.get()._2();
if (r.getPid() == null) {
r.setPid(new ArrayList<>());
}
r.getPid().add(a._1());
}
});

View File

@ -53,9 +53,7 @@ public class PrepareResultCommunitySetStep2 {
conf,
isSparkSessionManaged,
spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath);
}
mergeInfo(spark, inputPath, outputPath);
});
}

View File

@ -84,7 +84,7 @@ public class SparkResultToOrganizationFromIstRepoJob {
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
// removeOutputDir(spark, outputPath);
if (saveGraph) {
execPropagation(
spark,

View File

@ -42,8 +42,8 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
final String pid = e.attributeValue("nameIdentifier");
final String pidType = e.attributeValue("nameIdentifierScheme");
if (StringUtils.isNotBlank(pid) && StringUtils.isNotBlank(pidType)) {
author.setPid(new ArrayList<>());
if (StringUtils.isNotBlank(pid) && StringUtils.isNotBlank(pidType)) {
author
.getPid()
.add(structuredProperty(pid, qualifier(pidType, pidType, DNET_PID_TYPES, DNET_PID_TYPES), info));