diff --git a/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml index 59fd30fead..1538318c14 100644 --- a/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + postgresURL @@ -102,7 +102,7 @@ - + ${jobTracker} ${nameNode} @@ -113,7 +113,7 @@ - + ${jobTracker} ${nameNode} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java index 0694556b2e..9e5d986448 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java @@ -29,31 +29,32 @@ public class EventFactory { "yyyy-MM-dd" }; - public static Event newBrokerEvent(final Result source, final Result target, final UpdateInfo updateInfo) { + public static Event newBrokerEvent(final UpdateInfo updateInfo) { final long now = new Date().getTime(); final Event res = new Event(); - final Map map = createMapFromResult(target, source, updateInfo); + final Map map = createMapFromResult(updateInfo); - final String payload = createPayload(target, updateInfo); + final String payload = createPayload(updateInfo); final String eventId = calculateEventId( - updateInfo.getTopic(), target.getOriginalId().get(0), updateInfo.getHighlightValueAsString()); + updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0), + updateInfo.getHighlightValueAsString()); res.setEventId(eventId); res.setProducerId(PRODUCER_ID); res.setPayload(payload); res.setMap(map); - res.setTopic(updateInfo.getTopic()); + res.setTopic(updateInfo.getTopicPath()); res.setCreationDate(now); res.setExpiryDate(calculateExpiryDate(now)); res.setInstantMessage(false); return res; } - private static String createPayload(final Result result, final UpdateInfo updateInfo) { + private static String createPayload(final UpdateInfo updateInfo) { final OpenAireEventPayload payload = new OpenAireEventPayload(); // TODO @@ -62,32 +63,34 @@ public class EventFactory { return payload.toJSON(); } - private static Map createMapFromResult(final Result oaf, final Result source, - final UpdateInfo updateInfo) { + private static Map createMapFromResult(final UpdateInfo updateInfo) { final Map map = new HashMap<>(); - final List collectedFrom = oaf.getCollectedfrom(); + final Result source = updateInfo.getSource(); + final Result target = updateInfo.getTarget(); + + final List collectedFrom = target.getCollectedfrom(); if (collectedFrom.size() == 1) { map.put("target_datasource_id", collectedFrom.get(0).getKey()); map.put("target_datasource_name", collectedFrom.get(0).getValue()); } - final List ids = oaf.getOriginalId(); + final List ids = target.getOriginalId(); if (ids.size() > 0) { map.put("target_publication_id", ids.get(0)); } - final List titles = oaf.getTitle(); + final List titles = target.getTitle(); if (titles.size() > 0) { map.put("target_publication_title", titles.get(0)); } - final long date = parseDateTolong(oaf.getDateofacceptance().getValue()); + final long date = parseDateTolong(target.getDateofacceptance().getValue()); if (date > 0) { map.put("target_dateofacceptance", date); } - final List subjects = oaf.getSubject(); + final List subjects = target.getSubject(); if (subjects.size() > 0) { map .put( @@ -95,7 +98,7 @@ public class EventFactory { subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList())); } - final List authors = oaf.getAuthor(); + final List authors = target.getAuthor(); if (authors.size() > 0) { map .put( diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Topic.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Topic.java new file mode 100644 index 0000000000..29f6cbe3af --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/Topic.java @@ -0,0 +1,52 @@ + +package eu.dnetlib.dhp.broker.model; + +public enum Topic { + + // ENRICHMENT MISSING + ENRICH_MISSING_OA_VERSION("ENRICH/MISSING/OPENACCESS_VERSION"), ENRICH_MISSING_ABSTRACT( + "ENRICH/MISSING/ABSTRACT"), ENRICH_MISSING_PUBLICATION_DATE( + "ENRICH/MISSING/PUBLICATION_DATE"), ENRICH_MISSING_PID( + "ENRICH/MISSING/PID"), ENRICH_MISSING_PROJECT("ENRICH/MISSING/PROJECT"), ENRICH_MISSING_SOFTWARE( + "ENRICH/MISSING/SOFTWARE"), ENRICH_MISSING_SUBJECT_MESHEUROPMC( + "ENRICH/MISSING/SUBJECT/MESHEUROPMC"), ENRICH_MISSING_SUBJECT_ARXIV( + "ENRICH/MISSING/SUBJECT/ARXIV"), ENRICH_MISSING_SUBJECT_JEL( + "ENRICH/MISSING/SUBJECT/JEL"), ENRICH_MISSING_SUBJECT_DDC( + "ENRICH/MISSING/SUBJECT/DDC"), ENRICH_MISSING_SUBJECT_ACM( + "ENRICH/MISSING/SUBJECT/ACM"), ENRICH_MISSING_SUBJECT_RVK( + "ENRICH/MISSING/SUBJECT/RVK"), ENRICH_MISSING_AUTHOR_ORCID( + "ENRICH/MISSING/AUTHOR/ORCID"), + + // ENRICHMENT MORE + ENRICH_MORE_PID("ENRICH/MORE/PID"), ENRICH_MORE_OA_VERSION("ENRICH/MORE/OPENACCESS_VERSION"), ENRICH_MORE_ABSTRACT( + "ENRICH/MORE/ABSTRACT"), ENRICH_MORE_PUBLICATION_DATE("ENRICH/MORE/PUBLICATION_DATE"), ENRICH_MORE_PROJECT( + "ENRICH/MORE/PROJECT"), ENRICH_MORE_SUBJECT_MESHEUROPMC( + "ENRICH/MORE/SUBJECT/MESHEUROPMC"), ENRICH_MORE_SUBJECT_ARXIV( + "ENRICH/MORE/SUBJECT/ARXIV"), ENRICH_MORE_SUBJECT_JEL( + "ENRICH/MORE/SUBJECT/JEL"), ENRICH_MORE_SUBJECT_DDC( + "ENRICH/MORE/SUBJECT/DDC"), ENRICH_MORE_SUBJECT_ACM( + "ENRICH/MORE/SUBJECT/ACM"), ENRICH_MORE_SUBJECT_RVK("ENRICH/MORE/SUBJECT/RVK"), + + // ADDITION + ADD_BY_PROJECT("ADD/BY_PROJECT"); + + Topic(final String path) { + this.path = path; + } + + protected String path; + + public String getPath() { + return this.path; + } + + public static Topic fromPath(final String path) { + for (final Topic t : Topic.values()) { + if (t.getPath().equals(path)) { + return t; + } + } + return null; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index 54d4ef36aa..c4c167c13f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -14,8 +14,6 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.model.Event; import eu.dnetlib.dhp.broker.model.EventFactory; @@ -30,6 +28,7 @@ import eu.dnetlib.dhp.broker.oa.util.EnrichMoreOpenAccess; import eu.dnetlib.dhp.broker.oa.util.EnrichMorePid; import eu.dnetlib.dhp.broker.oa.util.EnrichMoreSubject; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; +import eu.dnetlib.dhp.broker.oa.util.UpdateMatcher; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.oaf.Result; @@ -37,7 +36,16 @@ public class GenerateEventsApplication { private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final UpdateMatcher enrichMissingAbstract = new EnrichMissingAbstract(); + private static final UpdateMatcher enrichMissingAuthorOrcid = new EnrichMissingAuthorOrcid(); + private static final UpdateMatcher enrichMissingOpenAccess = new EnrichMissingOpenAccess(); + private static final UpdateMatcher enrichMissingPid = new EnrichMissingPid(); + private static final UpdateMatcher enrichMissingProject = new EnrichMissingProject(); + private static final UpdateMatcher enrichMissingPublicationDate = new EnrichMissingPublicationDate(); + private static final UpdateMatcher enrichMissingSubject = new EnrichMissingSubject(); + private static final UpdateMatcher enrichMoreOpenAccess = new EnrichMoreOpenAccess(); + private static final UpdateMatcher enrichMorePid = new EnrichMorePid(); + private static final UpdateMatcher enrichMoreSubject = new EnrichMoreSubject(); public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -76,37 +84,22 @@ public class GenerateEventsApplication { } private List generateEvents(final Result... children) { - final List list = new ArrayList<>(); + final List> list = new ArrayList<>(); - for (final Result source : children) { - for (final Result target : children) { - if (source != target) { - list - .addAll( - findUpdates(source, target) - .stream() - .map(info -> EventFactory.newBrokerEvent(source, target, info)) - .collect(Collectors.toList())); - } - } + for (final Result target : children) { + list.addAll(enrichMissingAbstract.searchUpdatesForRecord(target, children)); + list.addAll(enrichMissingAuthorOrcid.searchUpdatesForRecord(target, children)); + list.addAll(enrichMissingOpenAccess.searchUpdatesForRecord(target, children)); + list.addAll(enrichMissingPid.searchUpdatesForRecord(target, children)); + list.addAll(enrichMissingProject.searchUpdatesForRecord(target, children)); + list.addAll(enrichMissingPublicationDate.searchUpdatesForRecord(target, children)); + list.addAll(enrichMissingSubject.searchUpdatesForRecord(target, children)); + list.addAll(enrichMoreOpenAccess.searchUpdatesForRecord(target, children)); + list.addAll(enrichMorePid.searchUpdatesForRecord(target, children)); + list.addAll(enrichMoreSubject.searchUpdatesForRecord(target, children)); } - return list; - } - - private List> findUpdates(final Result source, final Result target) { - final List> list = new ArrayList<>(); - list.addAll(EnrichMissingAbstract.findUpdates(source, target)); - list.addAll(EnrichMissingAuthorOrcid.findUpdates(source, target)); - list.addAll(EnrichMissingOpenAccess.findUpdates(source, target)); - list.addAll(EnrichMissingPid.findUpdates(source, target)); - list.addAll(EnrichMissingProject.findUpdates(source, target)); - list.addAll(EnrichMissingPublicationDate.findUpdates(source, target)); - list.addAll(EnrichMissingSubject.findUpdates(source, target)); - list.addAll(EnrichMoreOpenAccess.findUpdates(source, target)); - list.addAll(EnrichMorePid.findUpdates(source, target)); - list.addAll(EnrichMoreSubject.findUpdates(source, target)); - return list; + return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList()); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAbstract.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAbstract.java index 493d1f97c7..6b6e35d1d6 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAbstract.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAbstract.java @@ -1,31 +1,35 @@ package eu.dnetlib.dhp.broker.oa.util; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import eu.dnetlib.broker.objects.OpenAireEventPayload; +import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.schema.oaf.Result; -public class EnrichMissingAbstract extends UpdateInfo { +public class EnrichMissingAbstract extends UpdateMatcher { - public static List findUpdates(final Result source, final Result target) { - // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f)); - return Arrays.asList(); - } - - private EnrichMissingAbstract(final String highlightValue, final float trust) { - super("ENRICH/MISSING/ABSTRACT", highlightValue, trust); + public EnrichMissingAbstract() { + super(false); } @Override - public void compileHighlight(final OpenAireEventPayload payload) { - payload.getHighlight().getAbstracts().add(getHighlightValue()); + protected List> findUpdates(final Result source, final Result target) { + if (isMissing(target.getDescription()) && !isMissing(source.getDescription())) { + return Arrays.asList(generateUpdateInfo(source.getDescription().get(0).getValue(), source, target)); + } + return new ArrayList<>(); } @Override - public String getHighlightValueAsString() { - return getHighlightValue(); + public UpdateInfo generateUpdateInfo(final String highlightValue, final Result source, + final Result target) { + return new UpdateInfo<>( + Topic.ENRICH_MISSING_ABSTRACT, + highlightValue, source, target, + (p, s) -> p.getAbstracts().add(s), + s -> s); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAuthorOrcid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAuthorOrcid.java index 6899c62a37..d81427e05c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAuthorOrcid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingAuthorOrcid.java @@ -4,28 +4,30 @@ package eu.dnetlib.dhp.broker.oa.util; import java.util.Arrays; import java.util.List; -import eu.dnetlib.broker.objects.OpenAireEventPayload; +import org.apache.commons.lang3.tuple.Pair; + +import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.schema.oaf.Result; -public class EnrichMissingAuthorOrcid extends UpdateInfo { +public class EnrichMissingAuthorOrcid extends UpdateMatcher> { - public static List findUpdates(final Result source, final Result target) { + public EnrichMissingAuthorOrcid() { + super(true); + } + + @Override + protected List>> findUpdates(final Result source, final Result target) { // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f)); return Arrays.asList(); } - private EnrichMissingAuthorOrcid(final String highlightValue, final float trust) { - super("ENRICH/MISSING/AUTHOR/ORCID", highlightValue, trust); - } - @Override - public void compileHighlight(final OpenAireEventPayload payload) { - // TODO + public UpdateInfo> generateUpdateInfo(final Pair highlightValue, + final Result source, final Result target) { + return new UpdateInfo<>( + Topic.ENRICH_MISSING_AUTHOR_ORCID, + highlightValue, source, target, + (p, pair) -> p.getCreators().add(pair.getLeft() + " - ORCID: " + pair.getRight()), + pair -> pair.getLeft() + "::" + pair.getRight()); } - - @Override - public String getHighlightValueAsString() { - return getHighlightValue(); - } - } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingOpenAccess.java index 9464130f31..9079ee24b0 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingOpenAccess.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingOpenAccess.java @@ -5,28 +5,29 @@ import java.util.Arrays; import java.util.List; import eu.dnetlib.broker.objects.Instance; -import eu.dnetlib.broker.objects.OpenAireEventPayload; +import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.schema.oaf.Result; -public class EnrichMissingOpenAccess extends UpdateInfo { +public class EnrichMissingOpenAccess extends UpdateMatcher { + + public EnrichMissingOpenAccess() { + super(true); + } + + @Override + protected List> findUpdates(final Result source, final Result target) { - public static List findUpdates(final Result source, final Result target) { - // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f)); return Arrays.asList(); } - private EnrichMissingOpenAccess(final Instance highlightValue, final float trust) { - super("ENRICH/MISSING/OPENACCESS_VERSION", highlightValue, trust); - } - @Override - public void compileHighlight(final OpenAireEventPayload payload) { - payload.getHighlight().getInstances().add(getHighlightValue()); - } - - @Override - public String getHighlightValueAsString() { - return getHighlightValue().getUrl(); + public UpdateInfo generateUpdateInfo(final Instance highlightValue, final Result source, + final Result target) { + return new UpdateInfo<>( + Topic.ENRICH_MISSING_OA_VERSION, + highlightValue, source, target, + (p, i) -> p.getInstances().add(i), + Instance::getUrl); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPid.java index 293d4993f3..0b4045a0e4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPid.java @@ -4,29 +4,29 @@ package eu.dnetlib.dhp.broker.oa.util; import java.util.Arrays; import java.util.List; -import eu.dnetlib.broker.objects.OpenAireEventPayload; import eu.dnetlib.broker.objects.Pid; +import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.schema.oaf.Result; -public class EnrichMissingPid extends UpdateInfo { +public class EnrichMissingPid extends UpdateMatcher { - public static List findUpdates(final Result source, final Result target) { + public EnrichMissingPid() { + super(true); + } + + @Override + protected List> findUpdates(final Result source, final Result target) { // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f)); return Arrays.asList(); } - private EnrichMissingPid(final Pid highlightValue, final float trust) { - super("ENRICH/MISSING/PID", highlightValue, trust); - } - @Override - public void compileHighlight(final OpenAireEventPayload payload) { - payload.getHighlight().getPids().add(getHighlightValue()); - } - - @Override - public String getHighlightValueAsString() { - return getHighlightValue().getType() + "::" + getHighlightValue().getValue(); + public UpdateInfo generateUpdateInfo(final Pid highlightValue, final Result source, final Result target) { + return new UpdateInfo<>( + Topic.ENRICH_MISSING_PID, + highlightValue, source, target, + (p, pid) -> p.getPids().add(pid), + pid -> pid.getType() + "::" + pid.getValue()); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingProject.java index a22c179a20..45b16801c2 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingProject.java @@ -4,30 +4,30 @@ package eu.dnetlib.dhp.broker.oa.util; import java.util.Arrays; import java.util.List; -import eu.dnetlib.broker.objects.OpenAireEventPayload; import eu.dnetlib.broker.objects.Project; +import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.schema.oaf.Result; -public class EnrichMissingProject extends UpdateInfo { +public class EnrichMissingProject extends UpdateMatcher { - public static List findUpdates(final Result source, final Result target) { + public EnrichMissingProject() { + super(true); + } + + @Override + protected List> findUpdates(final Result source, final Result target) { // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f)); return Arrays.asList(); } - private EnrichMissingProject(final Project highlightValue, final float trust) { - super("ENRICH/MISSING/PROJECT", highlightValue, trust); - } - @Override - public void compileHighlight(final OpenAireEventPayload payload) { - payload.getHighlight().getProjects().add(getHighlightValue()); - } - - @Override - public String getHighlightValueAsString() { - return getHighlightValue().getFunder() + "::" + getHighlightValue().getFundingProgram() - + getHighlightValue().getCode(); + public UpdateInfo generateUpdateInfo(final Project highlightValue, final Result source, + final Result target) { + return new UpdateInfo<>( + Topic.ENRICH_MISSING_PROJECT, + highlightValue, source, target, + (p, prj) -> p.getProjects().add(prj), + prj -> prj.getFunder() + "::" + prj.getFundingProgram() + prj.getCode()); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPublicationDate.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPublicationDate.java index 869dca2645..7fcd2a66f9 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPublicationDate.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingPublicationDate.java @@ -4,28 +4,29 @@ package eu.dnetlib.dhp.broker.oa.util; import java.util.Arrays; import java.util.List; -import eu.dnetlib.broker.objects.OpenAireEventPayload; +import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.schema.oaf.Result; -public class EnrichMissingPublicationDate extends UpdateInfo { +public class EnrichMissingPublicationDate extends UpdateMatcher { - public static List findUpdates(final Result source, final Result target) { + public EnrichMissingPublicationDate() { + super(false); + } + + @Override + protected List> findUpdates(final Result source, final Result target) { // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f)); return Arrays.asList(); } - private EnrichMissingPublicationDate(final String highlightValue, final float trust) { - super("ENRICH/MISSING/PUBLICATION_DATE", highlightValue, trust); - } - @Override - public void compileHighlight(final OpenAireEventPayload payload) { - payload.getHighlight().setPublicationdate(getHighlightValue()); - } - - @Override - public String getHighlightValueAsString() { - return getHighlightValue(); + public UpdateInfo generateUpdateInfo(final String highlightValue, final Result source, + final Result target) { + return new UpdateInfo<>( + Topic.ENRICH_MISSING_PUBLICATION_DATE, + highlightValue, source, target, + (p, date) -> p.setPublicationdate(date), + s -> s); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingSubject.java index a2ed5d0439..4470bd9d98 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingSubject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMissingSubject.java @@ -4,12 +4,19 @@ package eu.dnetlib.dhp.broker.oa.util; import java.util.Arrays; import java.util.List; -import eu.dnetlib.broker.objects.OpenAireEventPayload; +import org.apache.commons.lang3.tuple.Pair; + +import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.schema.oaf.Result; -public class EnrichMissingSubject extends UpdateInfo { +public class EnrichMissingSubject extends UpdateMatcher> { - public static List findUpdates(final Result source, final Result target) { + public EnrichMissingSubject() { + super(true); + } + + @Override + protected List>> findUpdates(final Result source, final Result target) { // MESHEUROPMC // ARXIV // JEL @@ -19,18 +26,15 @@ public class EnrichMissingSubject extends UpdateInfo { return Arrays.asList(); } - private EnrichMissingSubject(final String subjectClassification, final String highlightValue, final float trust) { - super("ENRICH/MISSING/SUBJECT/" + subjectClassification, highlightValue, trust); - } - @Override - public void compileHighlight(final OpenAireEventPayload payload) { - payload.getHighlight().getSubjects().add(getHighlightValue()); - } + public UpdateInfo> generateUpdateInfo(final Pair highlightValue, + final Result source, final Result target) { - @Override - public String getHighlightValueAsString() { - return getHighlightValue(); + return new UpdateInfo<>( + Topic.fromPath("ENRICH/MISSING/SUBJECT/" + highlightValue.getLeft()), + highlightValue, source, target, + (p, pair) -> p.getSubjects().add(pair.getRight()), + pair -> pair.getLeft() + "::" + pair.getRight()); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreOpenAccess.java index 4f1e88d3d5..bc37ce659a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreOpenAccess.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreOpenAccess.java @@ -5,28 +5,29 @@ import java.util.Arrays; import java.util.List; import eu.dnetlib.broker.objects.Instance; -import eu.dnetlib.broker.objects.OpenAireEventPayload; +import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.schema.oaf.Result; -public class EnrichMoreOpenAccess extends UpdateInfo { +public class EnrichMoreOpenAccess extends UpdateMatcher { - public static List findUpdates(final Result source, final Result target) { + public EnrichMoreOpenAccess() { + super(true); + } + + @Override + protected List> findUpdates(final Result source, final Result target) { // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f)); return Arrays.asList(); } - private EnrichMoreOpenAccess(final Instance highlightValue, final float trust) { - super("ENRICH/MORE/OPENACCESS_VERSION", highlightValue, trust); - } - @Override - public void compileHighlight(final OpenAireEventPayload payload) { - payload.getHighlight().getInstances().add(getHighlightValue()); - } - - @Override - public String getHighlightValueAsString() { - return getHighlightValue().getUrl(); + public UpdateInfo generateUpdateInfo(final Instance highlightValue, final Result source, + final Result target) { + return new UpdateInfo<>( + Topic.ENRICH_MORE_OA_VERSION, + highlightValue, source, target, + (p, i) -> p.getInstances().add(i), + Instance::getUrl); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMorePid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMorePid.java index ecf2cf3107..8cd67f5536 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMorePid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMorePid.java @@ -4,29 +4,29 @@ package eu.dnetlib.dhp.broker.oa.util; import java.util.Arrays; import java.util.List; -import eu.dnetlib.broker.objects.OpenAireEventPayload; import eu.dnetlib.broker.objects.Pid; +import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.schema.oaf.Result; -public class EnrichMorePid extends UpdateInfo { +public class EnrichMorePid extends UpdateMatcher { - public static List findUpdates(final Result source, final Result target) { + public EnrichMorePid() { + super(true); + } + + @Override + protected List> findUpdates(final Result source, final Result target) { // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f)); return Arrays.asList(); } - private EnrichMorePid(final Pid highlightValue, final float trust) { - super("ENRICH/MORE/PID", highlightValue, trust); - } - @Override - public void compileHighlight(final OpenAireEventPayload payload) { - payload.getHighlight().getPids().add(getHighlightValue()); - } - - @Override - public String getHighlightValueAsString() { - return getHighlightValue().getType() + "::" + getHighlightValue().getValue(); + public UpdateInfo generateUpdateInfo(final Pid highlightValue, final Result source, final Result target) { + return new UpdateInfo<>( + Topic.ENRICH_MORE_PID, + highlightValue, source, target, + (p, pid) -> p.getPids().add(pid), + pid -> pid.getType() + "::" + pid.getValue()); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreSubject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreSubject.java index f29b86292d..9e0d8e6939 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreSubject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EnrichMoreSubject.java @@ -4,12 +4,19 @@ package eu.dnetlib.dhp.broker.oa.util; import java.util.Arrays; import java.util.List; -import eu.dnetlib.broker.objects.OpenAireEventPayload; +import org.apache.commons.lang3.tuple.Pair; + +import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.schema.oaf.Result; -public class EnrichMoreSubject extends UpdateInfo { +public class EnrichMoreSubject extends UpdateMatcher> { - public static List findUpdates(final Result source, final Result target) { + public EnrichMoreSubject() { + super(true); + } + + @Override + protected List>> findUpdates(final Result source, final Result target) { // MESHEUROPMC // ARXIV // JEL @@ -19,18 +26,15 @@ public class EnrichMoreSubject extends UpdateInfo { return Arrays.asList(); } - private EnrichMoreSubject(final String subjectClassification, final String highlightValue, final float trust) { - super("ENRICH/MORE/SUBJECT/" + subjectClassification, highlightValue, trust); - } - @Override - public void compileHighlight(final OpenAireEventPayload payload) { - payload.getHighlight().getSubjects().add(getHighlightValue()); - } + public UpdateInfo> generateUpdateInfo(final Pair highlightValue, + final Result source, final Result target) { - @Override - public String getHighlightValueAsString() { - return getHighlightValue(); + return new UpdateInfo<>( + Topic.fromPath("ENRICH/MORE/SUBJECT/" + highlightValue.getLeft()), + highlightValue, source, target, + (p, pair) -> p.getSubjects().add(pair.getRight()), + pair -> pair.getLeft() + "::" + pair.getRight()); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java index f7b6b69e9e..1dfc14e5eb 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java @@ -1,36 +1,77 @@ package eu.dnetlib.dhp.broker.oa.util; +import java.util.function.BiConsumer; +import java.util.function.Function; + import eu.dnetlib.broker.objects.OpenAireEventPayload; +import eu.dnetlib.broker.objects.Publication; +import eu.dnetlib.dhp.broker.model.Topic; +import eu.dnetlib.dhp.schema.oaf.Result; -public abstract class UpdateInfo { +public final class UpdateInfo { - private final String topic; + private final Topic topic; private final T highlightValue; + private final Result source; + + private final Result target; + + private final BiConsumer compileHighlight; + + private final Function highlightToString; + private final float trust; - protected UpdateInfo(final String topic, final T highlightValue, final float trust) { + protected UpdateInfo(final Topic topic, final T highlightValue, final Result source, final Result target, + final BiConsumer compileHighlight, + final Function highlightToString) { this.topic = topic; this.highlightValue = highlightValue; - this.trust = trust; + this.source = source; + this.target = target; + this.compileHighlight = compileHighlight; + this.highlightToString = highlightToString; + this.trust = calculateTrust(source, target); } public T getHighlightValue() { return highlightValue; } + public Result getSource() { + return source; + } + + public Result getTarget() { + return target; + } + + private float calculateTrust(final Result source, final Result target) { + // TODO + return 0.9f; + } + + protected Topic getTopic() { + return topic; + } + + public String getTopicPath() { + return topic.getPath(); + } + public float getTrust() { return trust; } - public String getTopic() { - return topic; + public void compileHighlight(final OpenAireEventPayload payload) { + compileHighlight.accept(payload.getHighlight(), getHighlightValue()); } - abstract public void compileHighlight(OpenAireEventPayload payload); - - abstract public String getHighlightValueAsString(); + public String getHighlightValueAsString() { + return highlightToString.apply(getHighlightValue()); + } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateMatcher.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateMatcher.java new file mode 100644 index 0000000000..3fd6d40276 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateMatcher.java @@ -0,0 +1,63 @@ + +package eu.dnetlib.dhp.broker.oa.util; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang3.StringUtils; + +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.Result; + +public abstract class UpdateMatcher { + + private final boolean multipleUpdate; + + public UpdateMatcher(final boolean multipleUpdate) { + this.multipleUpdate = multipleUpdate; + } + + public Collection> searchUpdatesForRecord(final Result res, final Result... others) { + + final Map> infoMap = new HashMap<>(); + + for (final Result source : others) { + if (source != res) { + for (final UpdateInfo info : findUpdates(source, res)) { + final String s = DigestUtils.md5Hex(info.getHighlightValueAsString()); + if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) { + } else { + infoMap.put(s, info); + } + } + } + } + + final Collection> values = infoMap.values(); + + if (values.isEmpty() || multipleUpdate) { + return values; + } else { + final UpdateInfo v = values + .stream() + .sorted((o1, o2) -> Float.compare(o1.getTrust(), o2.getTrust())) + .findFirst() + .get(); + return Arrays.asList(v); + } + } + + protected abstract List> findUpdates(Result source, Result target); + + protected abstract UpdateInfo generateUpdateInfo(final T highlightValue, final Result source, + final Result target); + + protected static boolean isMissing(final List> list) { + return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0).getValue()); + } + +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index 75d85e2bad..1c65e8adec 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.bulktag; +import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.Optional; @@ -84,6 +85,7 @@ public class SparkBulkTagJob { conf, isSparkSessionManaged, spark -> { + removeOutputDir(spark, outputPath); execBulkTag(spark, inputPath, outputPath, protoMappingParams, resultClazz, cc); }); } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index 9dc17701bc..974b3a3b11 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -69,13 +69,16 @@ public class SparkCountryPropagationJob { runWithSparkSession( conf, isSparkSessionManaged, - spark -> execPropagation( - spark, - sourcePath, - preparedInfoPath, - outputPath, - resultClazz, - saveGraph)); + spark -> { + removeOutputDir(spark, outputPath); + execPropagation( + spark, + sourcePath, + preparedInfoPath, + outputPath, + resultClazz, + saveGraph); + }); } private static void execPropagation( diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java index 7cd057cf39..6549d1ed29 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java @@ -74,9 +74,7 @@ public class PrepareResultOrcidAssociationStep1 { conf, isSparkSessionManaged, spark -> { - if (isTest(parser)) { - removeOutputDir(spark, outputPath); - } + removeOutputDir(spark, outputPath); prepareInfo( spark, inputRelationPath, inputResultPath, outputResultPath, resultClazz, allowedsemrel); }); diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java index 65d8811bc7..2cea32e58f 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java @@ -50,9 +50,7 @@ public class PrepareResultOrcidAssociationStep2 { conf, isSparkSessionManaged, spark -> { - if (isTest(parser)) { - removeOutputDir(spark, outputPath); - } + removeOutputDir(spark, outputPath); mergeInfo(spark, inputPath, outputPath); }); } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java index b93b66d9f9..fd1de3282d 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java @@ -70,11 +70,10 @@ public class SparkOrcidToResultFromSemRelJob { conf, isSparkSessionManaged, spark -> { - if (isTest(parser)) { - removeOutputDir(spark, outputPath); - } - if (saveGraph) + removeOutputDir(spark, outputPath); + if (saveGraph) { execPropagation(spark, possibleUpdates, inputPath, outputPath, resultClazz); + } }); } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java index 05dcdc6928..c27da42583 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java @@ -60,6 +60,8 @@ public class PrepareProjectResultsAssociation { conf, isSparkSessionManaged, spark -> { + removeOutputDir(spark, potentialUpdatePath); + removeOutputDir(spark, alreadyLinkedPath); prepareResultProjProjectResults( spark, inputPath, diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java index e2d4d56878..90eb54e5fa 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java @@ -55,9 +55,7 @@ public class PrepareResultCommunitySet { conf, isSparkSessionManaged, spark -> { - if (isTest(parser)) { - removeOutputDir(spark, outputPath); - } + removeOutputDir(spark, outputPath); prepareInfo(spark, inputPath, outputPath, organizationMap); }); } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java index 71275cc7f0..66297e1779 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java @@ -68,11 +68,10 @@ public class SparkResultToCommunityFromOrganizationJob { conf, isSparkSessionManaged, spark -> { - if (isTest(parser)) { - removeOutputDir(spark, outputPath); - } - if (saveGraph) + removeOutputDir(spark, outputPath); + if (saveGraph) { execPropagation(spark, inputPath, outputPath, resultClazz, possibleupdatespath); + } }); } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java index f8fe1668fd..5f549be531 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java @@ -58,30 +58,15 @@ public class PrepareResultInstRepoAssociation { isSparkSessionManaged, spark -> { readNeededResources(spark, inputPath); + + removeOutputDir(spark, datasourceOrganizationPath); prepareDatasourceOrganization(spark, datasourceOrganizationPath); + + removeOutputDir(spark, alreadyLinkedPath); prepareAlreadyLinkedAssociation(spark, alreadyLinkedPath); }); } - private static void prepareAlreadyLinkedAssociation( - SparkSession spark, String alreadyLinkedPath) { - String query = "Select source resultId, collect_set(target) organizationSet " - + "from relation " - + "where datainfo.deletedbyinference = false " - + "and relClass = '" - + RELATION_RESULT_ORGANIZATION_REL_CLASS - + "' " - + "group by source"; - - spark - .sql(query) - .as(Encoders.bean(ResultOrganizationSet.class)) - // TODO retry to stick with datasets - .toJavaRDD() - .map(r -> OBJECT_MAPPER.writeValueAsString(r)) - .saveAsTextFile(alreadyLinkedPath, GzipCodec.class); - } - private static void readNeededResources(SparkSession spark, String inputPath) { Dataset datasource = readPath(spark, inputPath + "/datasource", Datasource.class); datasource.createOrReplaceTempView("datasource"); @@ -119,4 +104,24 @@ public class PrepareResultInstRepoAssociation { .option("compression", "gzip") .json(datasourceOrganizationPath); } + + private static void prepareAlreadyLinkedAssociation( + SparkSession spark, String alreadyLinkedPath) { + String query = "Select source resultId, collect_set(target) organizationSet " + + "from relation " + + "where datainfo.deletedbyinference = false " + + "and relClass = '" + + RELATION_RESULT_ORGANIZATION_REL_CLASS + + "' " + + "group by source"; + + spark + .sql(query) + .as(Encoders.bean(ResultOrganizationSet.class)) + // TODO retry to stick with datasets + .toJavaRDD() + .map(r -> OBJECT_MAPPER.writeValueAsString(r)) + .saveAsTextFile(alreadyLinkedPath, GzipCodec.class); + } + } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java index 86634d43fc..13577fa7c1 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java @@ -83,10 +83,8 @@ public class SparkResultToOrganizationFromIstRepoJob { conf, isSparkSessionManaged, spark -> { - if (isTest(parser)) { - removeOutputDir(spark, outputPath); - } - if (saveGraph) + removeOutputDir(spark, outputPath); + if (saveGraph) { execPropagation( spark, datasourceorganization, @@ -94,6 +92,7 @@ public class SparkResultToOrganizationFromIstRepoJob { inputPath, outputPath, resultClazz); + } }); } diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml index 754aba4f29..f019f8413d 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml @@ -18,6 +18,17 @@ + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + @@ -42,8 +53,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/relation ${nameNode}/${outputPath}/relation @@ -53,8 +62,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/organization ${nameNode}/${outputPath}/organization @@ -64,8 +71,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/project ${nameNode}/${outputPath}/project @@ -75,8 +80,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/datasource ${nameNode}/${outputPath}/datasource @@ -95,8 +98,6 @@ - ${jobTracker} - ${nameNode} yarn-cluster cluster bulkTagging-publication @@ -124,8 +125,6 @@ - ${jobTracker} - ${nameNode} yarn-cluster cluster bulkTagging-dataset @@ -153,8 +152,6 @@ - ${jobTracker} - ${nameNode} yarn-cluster cluster bulkTagging-orp @@ -182,8 +179,6 @@ - ${jobTracker} - ${nameNode} yarn-cluster cluster bulkTagging-software diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml index fc877071df..85116e4cc9 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml @@ -19,6 +19,17 @@ + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + @@ -43,8 +54,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/relation ${nameNode}/${outputPath}/relation @@ -54,18 +63,15 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/organization ${nameNode}/${outputPath}/organization + - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/project ${nameNode}/${outputPath}/project @@ -75,8 +81,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/datasource ${nameNode}/${outputPath}/datasource diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml index e4429b710c..5ddc5fedf4 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml @@ -57,6 +57,7 @@ + ${jobTracker} @@ -81,7 +82,6 @@ - @@ -230,8 +230,8 @@ - + @@ -271,6 +271,7 @@ + yarn @@ -302,6 +303,7 @@ + yarn @@ -333,6 +335,7 @@ + yarn diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml index 24e1d3b7fa..9e91c06fb3 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml @@ -14,6 +14,17 @@ + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + @@ -42,8 +53,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/relation ${nameNode}/${outputPath}/relation @@ -53,8 +62,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/publication ${nameNode}/${outputPath}/publication @@ -64,8 +71,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/dataset ${nameNode}/${outputPath}/dataset @@ -75,8 +80,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/otherresearchproduct ${nameNode}/${outputPath}/otherresearchproduct @@ -86,28 +89,24 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/software ${nameNode}/${outputPath}/software + - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/organization ${nameNode}/${outputPath}/organization + - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/project ${nameNode}/${outputPath}/project @@ -117,8 +116,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/datasource ${nameNode}/${outputPath}/datasource diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/oozie_app/workflow.xml index d481cad052..6a329fdc46 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/oozie_app/workflow.xml @@ -14,6 +14,17 @@ + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + @@ -38,8 +49,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/relation ${nameNode}/${outputPath}/relation @@ -49,8 +58,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/organization ${nameNode}/${outputPath}/organization @@ -60,8 +67,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/project ${nameNode}/${outputPath}/project @@ -71,8 +76,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/datasource ${nameNode}/${outputPath}/datasource @@ -101,8 +104,8 @@ --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --sourcePath${sourcePath}/relation - --hive_metastore_uris${hive_metastore_uris} --outputPath${workingDir}/preparedInfo/resultCommunityList + --hive_metastore_uris${hive_metastore_uris} --organizationtoresultcommunitymap${organizationtoresultcommunitymap} @@ -136,9 +139,9 @@ --preparedInfoPath${workingDir}/preparedInfo/resultCommunityList --sourcePath${sourcePath}/publication + --outputPath${outputPath}/publication --hive_metastore_uris${hive_metastore_uris} --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication - --outputPath${outputPath}/publication --saveGraph${saveGraph} @@ -165,9 +168,9 @@ --preparedInfoPath${workingDir}/preparedInfo/resultCommunityList --sourcePath${sourcePath}/dataset + --outputPath${outputPath}/dataset --hive_metastore_uris${hive_metastore_uris} --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset - --outputPath${outputPath}/dataset --saveGraph${saveGraph} @@ -194,9 +197,9 @@ --preparedInfoPath${workingDir}/preparedInfo/resultCommunityList --sourcePath${sourcePath}/otherresearchproduct + --outputPath${outputPath}/otherresearchproduct --hive_metastore_uris${hive_metastore_uris} --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --outputPath${outputPath}/otherresearchproduct --saveGraph${saveGraph} @@ -223,9 +226,9 @@ --preparedInfoPath${workingDir}/preparedInfo/resultCommunityList --sourcePath${sourcePath}/software + --outputPath${outputPath}/software --hive_metastore_uris${hive_metastore_uris} --resultTableNameeu.dnetlib.dhp.schema.oaf.Software - --outputPath${outputPath}/software --saveGraph${saveGraph} diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/oozie_app/workflow.xml index a1b7f4ad79..e0563abae8 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/oozie_app/workflow.xml @@ -10,6 +10,17 @@ + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + @@ -38,8 +49,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/relation ${nameNode}/${outputPath}/relation @@ -49,8 +58,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/publication ${nameNode}/${outputPath}/publication @@ -60,8 +67,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/dataset ${nameNode}/${outputPath}/dataset @@ -71,8 +76,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/otherresearchproduct ${nameNode}/${outputPath}/otherresearchproduct @@ -82,8 +85,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/software ${nameNode}/${outputPath}/software @@ -93,8 +94,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/organization ${nameNode}/${outputPath}/organization @@ -104,8 +103,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/project ${nameNode}/${outputPath}/project @@ -115,8 +112,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/datasource ${nameNode}/${outputPath}/datasource @@ -125,6 +120,7 @@ + yarn @@ -176,12 +172,12 @@ --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --sourcePath${sourcePath}/publication - --hive_metastore_uris${hive_metastore_uris} - --saveGraph${saveGraph} - --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication --outputPath${outputPath}/relation --datasourceOrganizationPath${workingDir}/preparedInfo/datasourceOrganization --alreadyLinkedPath${workingDir}/preparedInfo/alreadyLinked + --hive_metastore_uris${hive_metastore_uris} + --saveGraph${saveGraph} + --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication @@ -206,12 +202,12 @@ --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --sourcePath${sourcePath}/dataset - --hive_metastore_uris${hive_metastore_uris} - --saveGraph${saveGraph} - --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset --outputPath${outputPath}/relation --datasourceOrganizationPath${workingDir}/preparedInfo/datasourceOrganization --alreadyLinkedPath${workingDir}/preparedInfo/alreadyLinked + --hive_metastore_uris${hive_metastore_uris} + --saveGraph${saveGraph} + --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset @@ -236,12 +232,12 @@ --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --sourcePath${sourcePath}/otherresearchproduct - --hive_metastore_uris${hive_metastore_uris} - --saveGraph${saveGraph} - --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --outputPath${outputPath}/relation --datasourceOrganizationPath${workingDir}/preparedInfo/datasourceOrganization --alreadyLinkedPath${workingDir}/preparedInfo/alreadyLinked + --hive_metastore_uris${hive_metastore_uris} + --saveGraph${saveGraph} + --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct @@ -266,12 +262,12 @@ --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --sourcePath${sourcePath}/software - --hive_metastore_uris${hive_metastore_uris} - --saveGraph${saveGraph} - --resultTableNameeu.dnetlib.dhp.schema.oaf.Software --outputPath${outputPath}/relation --datasourceOrganizationPath${workingDir}/preparedInfo/datasourceOrganization --alreadyLinkedPath${workingDir}/preparedInfo/alreadyLinked + --hive_metastore_uris${hive_metastore_uris} + --saveGraph${saveGraph} + --resultTableNameeu.dnetlib.dhp.schema.oaf.Software diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index fd12716b42..b9c4e6c804 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -127,7 +127,6 @@ public abstract class AbstractMdRecordToOafMapper { final List oafs = new ArrayList<>(); switch (type.toLowerCase()) { - case "": case "publication": final Publication p = new Publication(); populateResultFields(p, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp); @@ -138,7 +137,7 @@ public abstract class AbstractMdRecordToOafMapper { case "dataset": final Dataset d = new Dataset(); populateResultFields(d, doc, collectedFrom, hostedBy, info, lastUpdateTimestamp); - d.setResulttype(PUBLICATION_DEFAULT_RESULTTYPE); + d.setResulttype(DATASET_DEFAULT_RESULTTYPE); d.setStoragedate(prepareDatasetStorageDate(doc, info)); d.setDevice(prepareDatasetDevice(doc, info)); d.setSize(prepareDatasetSize(doc, info)); @@ -158,6 +157,7 @@ public abstract class AbstractMdRecordToOafMapper { s.setProgrammingLanguage(prepareSoftwareProgrammingLanguage(doc, info)); oafs.add(s); break; + case "": case "otherresearchproducts": default: final OtherResearchProduct o = new OtherResearchProduct();