diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Topic.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Topic.java index 29f6cbe3af..98088dd0aa 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Topic.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Topic.java @@ -4,31 +4,48 @@ package eu.dnetlib.dhp.broker.model; public enum Topic { // ENRICHMENT MISSING - ENRICH_MISSING_OA_VERSION("ENRICH/MISSING/OPENACCESS_VERSION"), ENRICH_MISSING_ABSTRACT( - "ENRICH/MISSING/ABSTRACT"), ENRICH_MISSING_PUBLICATION_DATE( - "ENRICH/MISSING/PUBLICATION_DATE"), ENRICH_MISSING_PID( - "ENRICH/MISSING/PID"), ENRICH_MISSING_PROJECT("ENRICH/MISSING/PROJECT"), ENRICH_MISSING_SOFTWARE( - "ENRICH/MISSING/SOFTWARE"), ENRICH_MISSING_SUBJECT_MESHEUROPMC( - "ENRICH/MISSING/SUBJECT/MESHEUROPMC"), ENRICH_MISSING_SUBJECT_ARXIV( - "ENRICH/MISSING/SUBJECT/ARXIV"), ENRICH_MISSING_SUBJECT_JEL( - "ENRICH/MISSING/SUBJECT/JEL"), ENRICH_MISSING_SUBJECT_DDC( - "ENRICH/MISSING/SUBJECT/DDC"), ENRICH_MISSING_SUBJECT_ACM( - "ENRICH/MISSING/SUBJECT/ACM"), ENRICH_MISSING_SUBJECT_RVK( - "ENRICH/MISSING/SUBJECT/RVK"), ENRICH_MISSING_AUTHOR_ORCID( - "ENRICH/MISSING/AUTHOR/ORCID"), + ENRICH_MISSING_OA_VERSION("ENRICH/MISSING/OPENACCESS_VERSION"), + ENRICH_MISSING_ABSTRACT("ENRICH/MISSING/ABSTRACT"), + ENRICH_MISSING_PUBLICATION_DATE("ENRICH/MISSING/PUBLICATION_DATE"), + ENRICH_MISSING_PID("ENRICH/MISSING/PID"), + ENRICH_MISSING_PROJECT("ENRICH/MISSING/PROJECT"), + ENRICH_MISSING_SOFTWARE("ENRICH/MISSING/SOFTWARE"), + ENRICH_MISSING_SUBJECT_MESHEUROPMC("ENRICH/MISSING/SUBJECT/MESHEUROPMC"), + ENRICH_MISSING_SUBJECT_ARXIV("ENRICH/MISSING/SUBJECT/ARXIV"), + ENRICH_MISSING_SUBJECT_JEL("ENRICH/MISSING/SUBJECT/JEL"), + ENRICH_MISSING_SUBJECT_DDC("ENRICH/MISSING/SUBJECT/DDC"), + ENRICH_MISSING_SUBJECT_ACM("ENRICH/MISSING/SUBJECT/ACM"), + ENRICH_MISSING_SUBJECT_RVK("ENRICH/MISSING/SUBJECT/RVK"), + ENRICH_MISSING_AUTHOR_ORCID("ENRICH/MISSING/AUTHOR/ORCID"), // ENRICHMENT MORE - ENRICH_MORE_PID("ENRICH/MORE/PID"), ENRICH_MORE_OA_VERSION("ENRICH/MORE/OPENACCESS_VERSION"), ENRICH_MORE_ABSTRACT( - "ENRICH/MORE/ABSTRACT"), ENRICH_MORE_PUBLICATION_DATE("ENRICH/MORE/PUBLICATION_DATE"), ENRICH_MORE_PROJECT( - "ENRICH/MORE/PROJECT"), ENRICH_MORE_SUBJECT_MESHEUROPMC( - "ENRICH/MORE/SUBJECT/MESHEUROPMC"), ENRICH_MORE_SUBJECT_ARXIV( - "ENRICH/MORE/SUBJECT/ARXIV"), ENRICH_MORE_SUBJECT_JEL( - "ENRICH/MORE/SUBJECT/JEL"), ENRICH_MORE_SUBJECT_DDC( - "ENRICH/MORE/SUBJECT/DDC"), ENRICH_MORE_SUBJECT_ACM( - "ENRICH/MORE/SUBJECT/ACM"), ENRICH_MORE_SUBJECT_RVK("ENRICH/MORE/SUBJECT/RVK"), + ENRICH_MORE_PID("ENRICH/MORE/PID"), + ENRICH_MORE_OA_VERSION("ENRICH/MORE/OPENACCESS_VERSION"), + ENRICH_MORE_ABSTRACT("ENRICH/MORE/ABSTRACT"), + ENRICH_MORE_PUBLICATION_DATE("ENRICH/MORE/PUBLICATION_DATE"), + ENRICH_MORE_PROJECT("ENRICH/MORE/PROJECT"), + ENRICH_MORE_SUBJECT_MESHEUROPMC("ENRICH/MORE/SUBJECT/MESHEUROPMC"), + ENRICH_MORE_SUBJECT_ARXIV("ENRICH/MORE/SUBJECT/ARXIV"), + ENRICH_MORE_SUBJECT_JEL("ENRICH/MORE/SUBJECT/JEL"), + ENRICH_MORE_SUBJECT_DDC("ENRICH/MORE/SUBJECT/DDC"), + ENRICH_MORE_SUBJECT_ACM("ENRICH/MORE/SUBJECT/ACM"), + ENRICH_MORE_SUBJECT_RVK("ENRICH/MORE/SUBJECT/RVK"), // ADDITION - ADD_BY_PROJECT("ADD/BY_PROJECT"); + ADD_BY_PROJECT("ADD/BY_PROJECT"), + + // OTHER RELS + ENRICH_MISSING_PUBLICATION_IS_RELATED_TO("ENRICH/MISSING/PUBLICATION/IS_RELATED_TO"), + ENRICH_MISSING_PUBLICATION_REFERENCES("ENRICH/MISSING/PUBLICATION/REFERENCES"), + ENRICH_MISSING_PUBLICATION_IS_REFERENCED_BY("ENRICH/MISSING/PUBLICATION/IS_REFERENCED_BY"), + ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_TO("ENRICH/MISSING/PUBLICATION/IS_SUPPLEMENTED_TO"), + ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_BY("ENRICH/MISSING/PUBLICATION/IS_SUPPLEMENTED_BY"), + + ENRICH_MISSING_DATASET_IS_RELATED_TO("ENRICH/MISSING/DATASET/IS_RELATED_TO"), + ENRICH_MISSING_DATASET_REFERENCES("ENRICH/MISSING/DATASET/REFERENCES"), + ENRICH_MISSING_DATASET_IS_REFERENCED_BY("ENRICH/MISSING/DATASET/IS_REFERENCED_BY"), + ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_TO("ENRICH/MISSING/DATASET/IS_SUPPLEMENTED_TO"), + ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_BY("ENRICH/MISSING/DATASET/IS_SUPPLEMENTED_BY"); Topic(final String path) { this.path = path; @@ -42,9 +59,7 @@ public enum Topic { public static Topic fromPath(final String path) { for (final Topic t : Topic.values()) { - if (t.getPath().equals(path)) { - return t; - } + if (t.getPath().equals(path)) { return t; } } return null; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index 43ebd6dd84..5fdd109252 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -9,11 +9,21 @@ import java.util.Optional; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.model.Event; import eu.dnetlib.dhp.broker.model.EventFactory; @@ -30,7 +40,11 @@ import eu.dnetlib.dhp.broker.oa.matchers.EnrichMoreSubject; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.Software; public class GenerateEventsApplication { @@ -47,12 +61,13 @@ public class GenerateEventsApplication { private static final UpdateMatcher enrichMorePid = new EnrichMorePid(); private static final UpdateMatcher enrichMoreSubject = new EnrichMoreSubject(); + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - GenerateEventsApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json"))); + .toString(GenerateEventsApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -67,10 +82,23 @@ public class GenerateEventsApplication { final String eventsPath = parser.get("eventsPath"); log.info("eventsPath: {}", eventsPath); + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + final SparkConf conf = new SparkConf(); + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); removeOutputDir(spark, eventsPath); - generateEvents(spark, graphPath, eventsPath); + + final JavaRDD eventsRdd = sc.emptyRDD(); + + eventsRdd.union(generateSimpleEvents(spark, graphPath, Publication.class)); + eventsRdd.union(generateSimpleEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class)); + eventsRdd.union(generateSimpleEvents(spark, graphPath, Software.class)); + eventsRdd.union(generateSimpleEvents(spark, graphPath, OtherResearchProduct.class)); + + eventsRdd.saveAsTextFile(eventsPath, GzipCodec.class); }); } @@ -79,11 +107,34 @@ public class GenerateEventsApplication { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } - private static void generateEvents(final SparkSession spark, final String graphPath, final String eventsPath) { - // TODO + private static JavaRDD generateSimpleEvents(final SparkSession spark, + final String graphPath, + final Class resultClazz) { + + final Dataset results = + readPath(spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), resultClazz) + .filter(r -> r.getDataInfo().getDeletedbyinference()); + + final Dataset rels = + readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> r.getRelClass().equals("TODO")); // TODO mergedIN + + final Column c = null; // TODO + + final Dataset aa = results.joinWith(rels, results.col("id").equalTo(rels.col("source")), "inner") + .groupBy(rels.col("target")) + .agg(c) + .filter(x -> x.size() > 1) + // generateSimpleEvents(...) + // flatMap() + // toRdd() + ; + + return null; + } - private List generateEvents(final Result... children) { + private List generateSimpleEvents(final Result... children) { final List> list = new ArrayList<>(); for (final Result target : children) { @@ -102,4 +153,13 @@ public class GenerateEventsApplication { return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList()); } + public static Dataset readPath( + final SparkSession spark, + final String inputPath, + final Class clazz) { + return spark + .read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); + } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/EnrichMissingPublicationDate.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/EnrichMissingPublicationDate.java index e9ec082c46..372a4e4c9d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/EnrichMissingPublicationDate.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/EnrichMissingPublicationDate.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.broker.oa.matchers; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -16,12 +17,15 @@ public class EnrichMissingPublicationDate extends UpdateMatcher { @Override protected List> findUpdates(final Result source, final Result target) { - // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f)); - return Arrays.asList(); + if (isMissing(target.getDateofacceptance()) && !isMissing(source.getDateofacceptance())) { + return Arrays.asList(generateUpdateInfo(source.getDateofacceptance().getValue(), source, target)); + } + return new ArrayList<>(); } @Override - public UpdateInfo generateUpdateInfo(final String highlightValue, final Result source, + public UpdateInfo generateUpdateInfo(final String highlightValue, + final Result source, final Result target) { return new UpdateInfo<>( Topic.ENRICH_MISSING_PUBLICATION_DATE, diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java index b8b6132cd1..d91b03200e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java @@ -30,8 +30,7 @@ public abstract class UpdateMatcher { if (source != res) { for (final UpdateInfo info : findUpdates(source, res)) { final String s = DigestUtils.md5Hex(info.getHighlightValueAsString()); - if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) { - } else { + if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {} else { infoMap.put(s, info); } } @@ -54,11 +53,16 @@ public abstract class UpdateMatcher { protected abstract List> findUpdates(Result source, Result target); - protected abstract UpdateInfo generateUpdateInfo(final T highlightValue, final Result source, + protected abstract UpdateInfo generateUpdateInfo(final T highlightValue, + final Result source, final Result target); protected static boolean isMissing(final List> list) { return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0).getValue()); } + protected boolean isMissing(final Field field) { + return field == null || StringUtils.isBlank(field.getValue()); + } + }