partial implementation of generation of simple events
This commit is contained in:
parent
b71fbb68b1
commit
85ca5622d4
|
@ -4,31 +4,48 @@ package eu.dnetlib.dhp.broker.model;
|
|||
public enum Topic {
|
||||
|
||||
// ENRICHMENT MISSING
|
||||
ENRICH_MISSING_OA_VERSION("ENRICH/MISSING/OPENACCESS_VERSION"), ENRICH_MISSING_ABSTRACT(
|
||||
"ENRICH/MISSING/ABSTRACT"), ENRICH_MISSING_PUBLICATION_DATE(
|
||||
"ENRICH/MISSING/PUBLICATION_DATE"), ENRICH_MISSING_PID(
|
||||
"ENRICH/MISSING/PID"), ENRICH_MISSING_PROJECT("ENRICH/MISSING/PROJECT"), ENRICH_MISSING_SOFTWARE(
|
||||
"ENRICH/MISSING/SOFTWARE"), ENRICH_MISSING_SUBJECT_MESHEUROPMC(
|
||||
"ENRICH/MISSING/SUBJECT/MESHEUROPMC"), ENRICH_MISSING_SUBJECT_ARXIV(
|
||||
"ENRICH/MISSING/SUBJECT/ARXIV"), ENRICH_MISSING_SUBJECT_JEL(
|
||||
"ENRICH/MISSING/SUBJECT/JEL"), ENRICH_MISSING_SUBJECT_DDC(
|
||||
"ENRICH/MISSING/SUBJECT/DDC"), ENRICH_MISSING_SUBJECT_ACM(
|
||||
"ENRICH/MISSING/SUBJECT/ACM"), ENRICH_MISSING_SUBJECT_RVK(
|
||||
"ENRICH/MISSING/SUBJECT/RVK"), ENRICH_MISSING_AUTHOR_ORCID(
|
||||
"ENRICH/MISSING/AUTHOR/ORCID"),
|
||||
ENRICH_MISSING_OA_VERSION("ENRICH/MISSING/OPENACCESS_VERSION"),
|
||||
ENRICH_MISSING_ABSTRACT("ENRICH/MISSING/ABSTRACT"),
|
||||
ENRICH_MISSING_PUBLICATION_DATE("ENRICH/MISSING/PUBLICATION_DATE"),
|
||||
ENRICH_MISSING_PID("ENRICH/MISSING/PID"),
|
||||
ENRICH_MISSING_PROJECT("ENRICH/MISSING/PROJECT"),
|
||||
ENRICH_MISSING_SOFTWARE("ENRICH/MISSING/SOFTWARE"),
|
||||
ENRICH_MISSING_SUBJECT_MESHEUROPMC("ENRICH/MISSING/SUBJECT/MESHEUROPMC"),
|
||||
ENRICH_MISSING_SUBJECT_ARXIV("ENRICH/MISSING/SUBJECT/ARXIV"),
|
||||
ENRICH_MISSING_SUBJECT_JEL("ENRICH/MISSING/SUBJECT/JEL"),
|
||||
ENRICH_MISSING_SUBJECT_DDC("ENRICH/MISSING/SUBJECT/DDC"),
|
||||
ENRICH_MISSING_SUBJECT_ACM("ENRICH/MISSING/SUBJECT/ACM"),
|
||||
ENRICH_MISSING_SUBJECT_RVK("ENRICH/MISSING/SUBJECT/RVK"),
|
||||
ENRICH_MISSING_AUTHOR_ORCID("ENRICH/MISSING/AUTHOR/ORCID"),
|
||||
|
||||
// ENRICHMENT MORE
|
||||
ENRICH_MORE_PID("ENRICH/MORE/PID"), ENRICH_MORE_OA_VERSION("ENRICH/MORE/OPENACCESS_VERSION"), ENRICH_MORE_ABSTRACT(
|
||||
"ENRICH/MORE/ABSTRACT"), ENRICH_MORE_PUBLICATION_DATE("ENRICH/MORE/PUBLICATION_DATE"), ENRICH_MORE_PROJECT(
|
||||
"ENRICH/MORE/PROJECT"), ENRICH_MORE_SUBJECT_MESHEUROPMC(
|
||||
"ENRICH/MORE/SUBJECT/MESHEUROPMC"), ENRICH_MORE_SUBJECT_ARXIV(
|
||||
"ENRICH/MORE/SUBJECT/ARXIV"), ENRICH_MORE_SUBJECT_JEL(
|
||||
"ENRICH/MORE/SUBJECT/JEL"), ENRICH_MORE_SUBJECT_DDC(
|
||||
"ENRICH/MORE/SUBJECT/DDC"), ENRICH_MORE_SUBJECT_ACM(
|
||||
"ENRICH/MORE/SUBJECT/ACM"), ENRICH_MORE_SUBJECT_RVK("ENRICH/MORE/SUBJECT/RVK"),
|
||||
ENRICH_MORE_PID("ENRICH/MORE/PID"),
|
||||
ENRICH_MORE_OA_VERSION("ENRICH/MORE/OPENACCESS_VERSION"),
|
||||
ENRICH_MORE_ABSTRACT("ENRICH/MORE/ABSTRACT"),
|
||||
ENRICH_MORE_PUBLICATION_DATE("ENRICH/MORE/PUBLICATION_DATE"),
|
||||
ENRICH_MORE_PROJECT("ENRICH/MORE/PROJECT"),
|
||||
ENRICH_MORE_SUBJECT_MESHEUROPMC("ENRICH/MORE/SUBJECT/MESHEUROPMC"),
|
||||
ENRICH_MORE_SUBJECT_ARXIV("ENRICH/MORE/SUBJECT/ARXIV"),
|
||||
ENRICH_MORE_SUBJECT_JEL("ENRICH/MORE/SUBJECT/JEL"),
|
||||
ENRICH_MORE_SUBJECT_DDC("ENRICH/MORE/SUBJECT/DDC"),
|
||||
ENRICH_MORE_SUBJECT_ACM("ENRICH/MORE/SUBJECT/ACM"),
|
||||
ENRICH_MORE_SUBJECT_RVK("ENRICH/MORE/SUBJECT/RVK"),
|
||||
|
||||
// ADDITION
|
||||
ADD_BY_PROJECT("ADD/BY_PROJECT");
|
||||
ADD_BY_PROJECT("ADD/BY_PROJECT"),
|
||||
|
||||
// OTHER RELS
|
||||
ENRICH_MISSING_PUBLICATION_IS_RELATED_TO("ENRICH/MISSING/PUBLICATION/IS_RELATED_TO"),
|
||||
ENRICH_MISSING_PUBLICATION_REFERENCES("ENRICH/MISSING/PUBLICATION/REFERENCES"),
|
||||
ENRICH_MISSING_PUBLICATION_IS_REFERENCED_BY("ENRICH/MISSING/PUBLICATION/IS_REFERENCED_BY"),
|
||||
ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_TO("ENRICH/MISSING/PUBLICATION/IS_SUPPLEMENTED_TO"),
|
||||
ENRICH_MISSING_PUBLICATION_IS_SUPPLEMENTED_BY("ENRICH/MISSING/PUBLICATION/IS_SUPPLEMENTED_BY"),
|
||||
|
||||
ENRICH_MISSING_DATASET_IS_RELATED_TO("ENRICH/MISSING/DATASET/IS_RELATED_TO"),
|
||||
ENRICH_MISSING_DATASET_REFERENCES("ENRICH/MISSING/DATASET/REFERENCES"),
|
||||
ENRICH_MISSING_DATASET_IS_REFERENCED_BY("ENRICH/MISSING/DATASET/IS_REFERENCED_BY"),
|
||||
ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_TO("ENRICH/MISSING/DATASET/IS_SUPPLEMENTED_TO"),
|
||||
ENRICH_MISSING_DATASET_IS_SUPPLEMENTED_BY("ENRICH/MISSING/DATASET/IS_SUPPLEMENTED_BY");
|
||||
|
||||
Topic(final String path) {
|
||||
this.path = path;
|
||||
|
@ -42,9 +59,7 @@ public enum Topic {
|
|||
|
||||
public static Topic fromPath(final String path) {
|
||||
for (final Topic t : Topic.values()) {
|
||||
if (t.getPath().equals(path)) {
|
||||
return t;
|
||||
}
|
||||
if (t.getPath().equals(path)) { return t; }
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -9,11 +9,21 @@ import java.util.Optional;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Column;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.broker.model.Event;
|
||||
import eu.dnetlib.dhp.broker.model.EventFactory;
|
||||
|
@ -30,7 +40,11 @@ import eu.dnetlib.dhp.broker.oa.matchers.EnrichMoreSubject;
|
|||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
|
||||
public class GenerateEventsApplication {
|
||||
|
||||
|
@ -47,11 +61,12 @@ public class GenerateEventsApplication {
|
|||
private static final UpdateMatcher<?> enrichMorePid = new EnrichMorePid();
|
||||
private static final UpdateMatcher<?> enrichMoreSubject = new EnrichMoreSubject();
|
||||
|
||||
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
GenerateEventsApplication.class
|
||||
.toString(GenerateEventsApplication.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
|
@ -67,10 +82,23 @@ public class GenerateEventsApplication {
|
|||
final String eventsPath = parser.get("eventsPath");
|
||||
log.info("eventsPath: {}", eventsPath);
|
||||
|
||||
final String resultClassName = parser.get("resultTableName");
|
||||
log.info("resultTableName: {}", resultClassName);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
removeOutputDir(spark, eventsPath);
|
||||
generateEvents(spark, graphPath, eventsPath);
|
||||
|
||||
final JavaRDD<Event> eventsRdd = sc.emptyRDD();
|
||||
|
||||
eventsRdd.union(generateSimpleEvents(spark, graphPath, Publication.class));
|
||||
eventsRdd.union(generateSimpleEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class));
|
||||
eventsRdd.union(generateSimpleEvents(spark, graphPath, Software.class));
|
||||
eventsRdd.union(generateSimpleEvents(spark, graphPath, OtherResearchProduct.class));
|
||||
|
||||
eventsRdd.saveAsTextFile(eventsPath, GzipCodec.class);
|
||||
});
|
||||
|
||||
}
|
||||
|
@ -79,11 +107,34 @@ public class GenerateEventsApplication {
|
|||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||
}
|
||||
|
||||
private static void generateEvents(final SparkSession spark, final String graphPath, final String eventsPath) {
|
||||
// TODO
|
||||
private static <R extends Result> JavaRDD<Event> generateSimpleEvents(final SparkSession spark,
|
||||
final String graphPath,
|
||||
final Class<R> resultClazz) {
|
||||
|
||||
final Dataset<R> results =
|
||||
readPath(spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), resultClazz)
|
||||
.filter(r -> r.getDataInfo().getDeletedbyinference());
|
||||
|
||||
final Dataset<Relation> rels =
|
||||
readPath(spark, graphPath + "/relation", Relation.class)
|
||||
.filter(r -> r.getRelClass().equals("TODO")); // TODO mergedIN
|
||||
|
||||
final Column c = null; // TODO
|
||||
|
||||
final Dataset<Row> aa = results.joinWith(rels, results.col("id").equalTo(rels.col("source")), "inner")
|
||||
.groupBy(rels.col("target"))
|
||||
.agg(c)
|
||||
.filter(x -> x.size() > 1)
|
||||
// generateSimpleEvents(...)
|
||||
// flatMap()
|
||||
// toRdd()
|
||||
;
|
||||
|
||||
return null;
|
||||
|
||||
}
|
||||
|
||||
private List<Event> generateEvents(final Result... children) {
|
||||
private List<Event> generateSimpleEvents(final Result... children) {
|
||||
final List<UpdateInfo<?>> list = new ArrayList<>();
|
||||
|
||||
for (final Result target : children) {
|
||||
|
@ -102,4 +153,13 @@ public class GenerateEventsApplication {
|
|||
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public static <R> Dataset<R> readPath(
|
||||
final SparkSession spark,
|
||||
final String inputPath,
|
||||
final Class<R> clazz) {
|
||||
return spark
|
||||
.read()
|
||||
.textFile(inputPath)
|
||||
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa.matchers;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -16,12 +17,15 @@ public class EnrichMissingPublicationDate extends UpdateMatcher<String> {
|
|||
|
||||
@Override
|
||||
protected List<UpdateInfo<String>> findUpdates(final Result source, final Result target) {
|
||||
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
|
||||
return Arrays.asList();
|
||||
if (isMissing(target.getDateofacceptance()) && !isMissing(source.getDateofacceptance())) {
|
||||
return Arrays.asList(generateUpdateInfo(source.getDateofacceptance().getValue(), source, target));
|
||||
}
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateInfo<String> generateUpdateInfo(final String highlightValue, final Result source,
|
||||
public UpdateInfo<String> generateUpdateInfo(final String highlightValue,
|
||||
final Result source,
|
||||
final Result target) {
|
||||
return new UpdateInfo<>(
|
||||
Topic.ENRICH_MISSING_PUBLICATION_DATE,
|
||||
|
|
|
@ -30,8 +30,7 @@ public abstract class UpdateMatcher<T> {
|
|||
if (source != res) {
|
||||
for (final UpdateInfo<T> info : findUpdates(source, res)) {
|
||||
final String s = DigestUtils.md5Hex(info.getHighlightValueAsString());
|
||||
if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {
|
||||
} else {
|
||||
if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {} else {
|
||||
infoMap.put(s, info);
|
||||
}
|
||||
}
|
||||
|
@ -54,11 +53,16 @@ public abstract class UpdateMatcher<T> {
|
|||
|
||||
protected abstract List<UpdateInfo<T>> findUpdates(Result source, Result target);
|
||||
|
||||
protected abstract UpdateInfo<T> generateUpdateInfo(final T highlightValue, final Result source,
|
||||
protected abstract UpdateInfo<T> generateUpdateInfo(final T highlightValue,
|
||||
final Result source,
|
||||
final Result target);
|
||||
|
||||
protected static boolean isMissing(final List<Field<String>> list) {
|
||||
return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0).getValue());
|
||||
}
|
||||
|
||||
protected boolean isMissing(final Field<String> field) {
|
||||
return field == null || StringUtils.isBlank(field.getValue());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue