From 7e82996e7c76ee0c2cb7c420130c95a8fcce3d2e Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 4 Jun 2020 17:10:43 +0200 Subject: [PATCH 1/3] partial implemantation of broker events generation --- .../dhp/broker/model/EventFactory.java | 19 ++---- .../broker/oa/GenerateEventsApplication.java | 42 ++++++++++--- .../relatedProjects/EnrichMissingProject.java | 14 ++++- .../relatedProjects/EnrichMoreProject.java | 18 +++++- .../EnrichMissingSoftware.java | 16 ++++- .../EnrichMoreSoftware.java | 20 ++++-- .../simple/EnrichMissingOpenAccess.java | 6 +- .../matchers/simple/EnrichMoreOpenAccess.java | 2 +- .../dhp/broker/oa/util/ConversionUtils.java | 63 +++++++++++++++++-- 9 files changed, 155 insertions(+), 45 deletions(-) rename dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/{simple => relatedSoftware}/EnrichMissingSoftware.java (72%) rename dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/{simple => relatedSoftware}/EnrichMoreSoftware.java (66%) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java index 9e5d98644..609f206b7 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java @@ -39,9 +39,8 @@ public class EventFactory { final String payload = createPayload(updateInfo); - final String eventId = calculateEventId( - updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0), - updateInfo.getHighlightValueAsString()); + final String eventId = + calculateEventId(updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0), updateInfo.getHighlightValueAsString()); res.setEventId(eventId); res.setProducerId(PRODUCER_ID); @@ -56,7 +55,7 @@ public class EventFactory { private static String createPayload(final UpdateInfo updateInfo) { final OpenAireEventPayload payload = new OpenAireEventPayload(); - // TODO + // TODO : use ConversionUtils updateInfo.compileHighlight(payload); @@ -93,17 +92,13 @@ public class EventFactory { final List subjects = target.getSubject(); if (subjects.size() > 0) { map - .put( - "target_publication_subject_list", - subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList())); + .put("target_publication_subject_list", subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList())); } final List authors = target.getAuthor(); if (authors.size() > 0) { map - .put( - "target_publication_author_list", - authors.stream().map(Author::getFullname).collect(Collectors.toList())); + .put("target_publication_author_list", authors.stream().map(Author::getFullname).collect(Collectors.toList())); } // PROVENANCE INFO @@ -130,9 +125,7 @@ public class EventFactory { } private static long parseDateTolong(final String date) { - if (StringUtils.isBlank(date)) { - return -1; - } + if (StringUtils.isBlank(date)) { return -1; } try { return DateUtils.parseDate(date, DATE_PATTERNS).getTime(); } catch (final ParseException e) { diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index 045104720..fede6f8bf 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -43,20 +43,21 @@ import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublic import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedBy; import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedTo; import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationReferences; +import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMissingSoftware; +import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMoreSoftware; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAbstract; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAuthorOrcid; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingOpenAccess; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPid; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSoftware; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSubject; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSoftware; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Relation; @@ -177,14 +178,6 @@ public class GenerateEventsApplication { } - private static JavaRDD generateRelationEvents(final SparkSession spark, - final String graphPath, - final Class sourceClass, - final Class targetClass) { - // TODO Auto-generated method stub - return null; - } - private List generateSimpleEvents(final Collection children) { final List> list = new ArrayList<>(); @@ -203,6 +196,35 @@ public class GenerateEventsApplication { return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList()); } + private static JavaRDD generateRelationEvents(final SparkSession spark, + final String graphPath, + final Class sourceClass, + final Class targetClass) { + + final Dataset sources = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass) + .filter(r -> r.getDataInfo().getDeletedbyinference()); + + final Dataset targets = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), targetClass); + + final Dataset mergedRels = readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); + + final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); + + if (targetClass == Project.class) { + // TODO join using: generateProjectsEvents + } else if (targetClass == Software.class) { + // TODO join using: generateSoftwareEvents + } else if (targetClass == Publication.class) { + // TODO join using: generatePublicationRelatedEvents + } else if (targetClass == eu.dnetlib.dhp.schema.oaf.Dataset.class) { + // TODO join using: generateDatasetRelatedEvents + } + + return null; + } + private List generateProjectsEvents(final Collection>> childrenWithProjects) { final List> list = new ArrayList<>(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java index 461266d56..3ce29f864 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java @@ -3,11 +3,13 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; +import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Result; @@ -22,8 +24,16 @@ public class EnrichMissingProject @Override protected List> findUpdates(final Pair> source, final Pair> target) { - // TODO - return Arrays.asList(); + + if (source.getRight().isEmpty()) { + return Arrays.asList(); + } else { + return target.getRight() + .stream() + .map(ConversionUtils::oafProjectToBrokerProject) + .map(p -> generateUpdateInfo(p, source, target)) + .collect(Collectors.toList()); + } } @Override diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java index d9bfb62d5..70ab5f71c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java @@ -1,13 +1,15 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects; -import java.util.Arrays; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; +import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Result; @@ -21,8 +23,18 @@ public class EnrichMoreProject extends UpdateMatcher> @Override protected List> findUpdates(final Pair> source, final Pair> target) { - // TODO - return Arrays.asList(); + + final Set existingProjects = source.getRight() + .stream() + .map(Project::getId) + .collect(Collectors.toSet()); + + return target.getRight() + .stream() + .filter(p -> !existingProjects.contains(p.getId())) + .map(ConversionUtils::oafProjectToBrokerProject) + .map(p -> generateUpdateInfo(p, source, target)) + .collect(Collectors.toList()); } @Override diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java similarity index 72% rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSoftware.java rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java index 4fcba43a4..d0d0f4083 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java @@ -1,13 +1,15 @@ -package eu.dnetlib.dhp.broker.oa.matchers.simple; +package eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; +import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Software; @@ -23,8 +25,16 @@ public class EnrichMissingSoftware protected List> findUpdates( final Pair> source, final Pair> target) { - // TODO - return Arrays.asList(); + + if (source.getRight().isEmpty()) { + return Arrays.asList(); + } else { + return target.getRight() + .stream() + .map(ConversionUtils::oafSoftwareToBrokerSoftware) + .map(p -> generateUpdateInfo(p, source, target)) + .collect(Collectors.toList()); + } } @Override diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java similarity index 66% rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSoftware.java rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java index a1affff62..455f08468 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java @@ -1,13 +1,15 @@ -package eu.dnetlib.dhp.broker.oa.matchers.simple; +package eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware; -import java.util.Arrays; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; +import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Software; @@ -23,8 +25,18 @@ public class EnrichMoreSoftware protected List> findUpdates( final Pair> source, final Pair> target) { - // TODO - return Arrays.asList(); + + final Set existingSoftwares = source.getRight() + .stream() + .map(Software::getId) + .collect(Collectors.toSet()); + + return target.getRight() + .stream() + .filter(p -> !existingSoftwares.contains(p.getId())) + .map(ConversionUtils::oafSoftwareToBrokerSoftware) + .map(p -> generateUpdateInfo(p, source, target)) + .collect(Collectors.toList()); } @Override diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java index c7e9dcbc1..524846d06 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java @@ -28,16 +28,14 @@ public class EnrichMissingOpenAccess extends UpdateMatcher { .filter(right -> right.equals(BrokerConstants.OPEN_ACCESS)) .count(); - if (count > 0) { - return Arrays.asList(); - } + if (count > 0) { return Arrays.asList(); } return source .getInstance() .stream() .filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS)) .map(ConversionUtils::oafInstanceToBrokerInstances) - .flatMap(s -> s) + .flatMap(List::stream) .map(i -> generateUpdateInfo(i, source, target)) .collect(Collectors.toList()); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java index c376da44d..f9c5f81da 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java @@ -34,7 +34,7 @@ public class EnrichMoreOpenAccess extends UpdateMatcher { .stream() .filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS)) .map(ConversionUtils::oafInstanceToBrokerInstances) - .flatMap(s -> s) + .flatMap(List::stream) .filter(i -> !urls.contains(i.getUrl())) .map(i -> generateUpdateInfo(i, source, target)) .collect(Collectors.toList()); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index 2f87d0ee7..4961fc4e4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -1,19 +1,25 @@ package eu.dnetlib.dhp.broker.oa.util; -import java.util.stream.Stream; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import eu.dnetlib.broker.objects.Instance; import eu.dnetlib.broker.objects.Pid; import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; public class ConversionUtils { - public static Stream oafInstanceToBrokerInstances(final eu.dnetlib.dhp.schema.oaf.Instance i) { + public static List oafInstanceToBrokerInstances(final eu.dnetlib.dhp.schema.oaf.Instance i) { return i.getUrl().stream().map(url -> { final Instance r = new Instance(); r.setUrl(url); @@ -21,7 +27,7 @@ public class ConversionUtils { r.setLicense(BrokerConstants.OPEN_ACCESS); r.setHostedby(i.getHostedby().getValue()); return r; - }); + }).collect(Collectors.toList()); } public static Pid oafPidToBrokerPid(final StructuredProperty sp) { @@ -37,13 +43,60 @@ public class ConversionUtils { public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) { final eu.dnetlib.broker.objects.Dataset res = new eu.dnetlib.broker.objects.Dataset(); - // TODO + res.setOriginalId(d.getOriginalId().get(0)); + res.setTitles(structPropList(d.getTitle())); + res.setPids(d.getPid().stream().map(ConversionUtils::structeredPropertyToPid).collect(Collectors.toList())); + res.setInstances(d.getInstance().stream().map(ConversionUtils::oafInstanceToBrokerInstances).flatMap(List::stream).collect(Collectors.toList())); + res.setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toList())); return res; } public static final eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication d) { final eu.dnetlib.broker.objects.Publication res = new eu.dnetlib.broker.objects.Publication(); - // TODO + // TODO This should be reusable return res; } + + public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) { + final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project(); + res.setTitle(fieldValue(p.getTitle())); + res.setAcronym(fieldValue(p.getAcronym())); + res.setCode(fieldValue(p.getCode())); + res.setFunder(null); // TODO + res.setFundingProgram(null); // TODO + res.setJurisdiction(null); // TODO + return res; + } + + public static final eu.dnetlib.broker.objects.Software oafSoftwareToBrokerSoftware(final Software sw) { + final eu.dnetlib.broker.objects.Software res = new eu.dnetlib.broker.objects.Software(); + res.setName(structPropValue(sw.getTitle())); + res.setDescription(fieldValue(sw.getDescription())); + res.setRepository(fieldValue(sw.getCodeRepositoryUrl())); + res.setLandingPage(fieldValue(sw.getDocumentationUrl())); + return res; + } + + private static Pid structeredPropertyToPid(final StructuredProperty sp) { + final Pid pid = new Pid(); + pid.setValue(sp.getValue()); + pid.setType(sp.getQualifier().getClassid()); + return pid; + } + + private static String fieldValue(final Field f) { + return f != null ? f.getValue() : null; + } + + private static String fieldValue(final List> fl) { + return fl != null && !fl.isEmpty() && fl.get(0) != null ? fl.get(0).getValue() : null; + } + + private static String structPropValue(final List props) { + return props != null && !props.isEmpty() && props.get(0) != null ? props.get(0).getValue() : null; + } + + private static List structPropList(final List props) { + return props != null ? props.stream().map(StructuredProperty::getValue).collect(Collectors.toList()) : new ArrayList<>(); + } } From a73973a74b26fa8a232dd62cfe096272d2acf71d Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Fri, 5 Jun 2020 11:43:00 +0200 Subject: [PATCH 2/3] partial implemantation of broker events generation --- dhp-workflows/dhp-broker-events/pom.xml | 2 +- .../dhp/broker/model/EventFactory.java | 14 +- .../AbstractEnrichMissingPublication.java | 2 +- .../dhp/broker/oa/util/ConversionUtils.java | 147 ++++++++++++------ .../dhp/broker/oa/util/UpdateInfo.java | 29 +++- 5 files changed, 128 insertions(+), 66 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml index 0e84d99a4..59ff05ed3 100644 --- a/dhp-workflows/dhp-broker-events/pom.xml +++ b/dhp-workflows/dhp-broker-events/pom.xml @@ -57,7 +57,7 @@ eu.dnetlib dnet-openaire-broker-common - [2.0.0,3.0.0) + [2.0.1,3.0.0) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java index 609f206b7..fcecc8369 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java @@ -12,7 +12,6 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateUtils; -import eu.dnetlib.broker.objects.OpenAireEventPayload; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.KeyValue; @@ -37,14 +36,12 @@ public class EventFactory { final Map map = createMapFromResult(updateInfo); - final String payload = createPayload(updateInfo); - final String eventId = calculateEventId(updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0), updateInfo.getHighlightValueAsString()); res.setEventId(eventId); res.setProducerId(PRODUCER_ID); - res.setPayload(payload); + res.setPayload(updateInfo.asBrokerPayload().toJSON()); res.setMap(map); res.setTopic(updateInfo.getTopicPath()); res.setCreationDate(now); @@ -53,15 +50,6 @@ public class EventFactory { return res; } - private static String createPayload(final UpdateInfo updateInfo) { - final OpenAireEventPayload payload = new OpenAireEventPayload(); - // TODO : use ConversionUtils - - updateInfo.compileHighlight(payload); - - return payload.toJSON(); - } - private static Map createMapFromResult(final UpdateInfo updateInfo) { final Map map = new HashMap<>(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java index 75e77b3c6..074b6043a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java @@ -39,7 +39,7 @@ public abstract class AbstractEnrichMissingPublication .getRight() .stream() .filter(d -> !existingPublications.contains(d.getId())) - .map(ConversionUtils::oafPublicationToBrokerPublication) + .map(ConversionUtils::oafResultToBrokerPublication) .map(i -> generateUpdateInfo(i, source, target)) .collect(Collectors.toList()); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index 4961fc4e4..d18c46e56 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -5,83 +5,130 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.DocumentHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import eu.dnetlib.broker.objects.Instance; import eu.dnetlib.broker.objects.Pid; +import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.ExternalReference; import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.Instance; +import eu.dnetlib.dhp.schema.oaf.Journal; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; public class ConversionUtils { - public static List oafInstanceToBrokerInstances(final eu.dnetlib.dhp.schema.oaf.Instance i) { + private static final Logger log = LoggerFactory.getLogger(ConversionUtils.class); + + public static List oafInstanceToBrokerInstances(final Instance i) { return i.getUrl().stream().map(url -> { - final Instance r = new Instance(); - r.setUrl(url); - r.setInstancetype(i.getInstancetype().getClassid()); - r.setLicense(BrokerConstants.OPEN_ACCESS); - r.setHostedby(i.getHostedby().getValue()); - return r; + return new eu.dnetlib.broker.objects.Instance() + .setUrl(url) + .setInstancetype(i.getInstancetype().getClassid()) + .setLicense(BrokerConstants.OPEN_ACCESS) + .setHostedby(i.getHostedby().getValue()); }).collect(Collectors.toList()); } public static Pid oafPidToBrokerPid(final StructuredProperty sp) { - final Pid pid = new Pid(); - pid.setValue(sp.getValue()); - pid.setType(sp.getQualifier().getClassid()); - return pid; + return sp != null ? new Pid() + .setValue(sp.getValue()) + .setType(sp.getQualifier().getClassid()) : null; } public static final Pair oafSubjectToPair(final StructuredProperty sp) { - return Pair.of(sp.getQualifier().getClassid(), sp.getValue()); + return sp != null ? Pair.of(sp.getQualifier().getClassid(), sp.getValue()) : null; } public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) { - final eu.dnetlib.broker.objects.Dataset res = new eu.dnetlib.broker.objects.Dataset(); - res.setOriginalId(d.getOriginalId().get(0)); - res.setTitles(structPropList(d.getTitle())); - res.setPids(d.getPid().stream().map(ConversionUtils::structeredPropertyToPid).collect(Collectors.toList())); - res.setInstances(d.getInstance().stream().map(ConversionUtils::oafInstanceToBrokerInstances).flatMap(List::stream).collect(Collectors.toList())); - res.setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toList())); - return res; + return d != null ? new eu.dnetlib.broker.objects.Dataset() + .setOriginalId(d.getOriginalId().get(0)) + .setTitles(structPropList(d.getTitle())) + .setPids(d.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) + .setInstances(d.getInstance().stream().map(ConversionUtils::oafInstanceToBrokerInstances).flatMap(List::stream).collect(Collectors.toList())) + .setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toList())) + : null; } - public static final eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication d) { - final eu.dnetlib.broker.objects.Publication res = new eu.dnetlib.broker.objects.Publication(); - // TODO This should be reusable - return res; + public static final eu.dnetlib.broker.objects.Publication oafResultToBrokerPublication(final Result result) { + + return result != null ? new eu.dnetlib.broker.objects.Publication() + .setOriginalId(result.getOriginalId().get(0)) + .setTitles(structPropList(result.getTitle())) + .setAbstracts(fieldList(result.getDescription())) + .setLanguage(result.getLanguage().getClassid()) + .setSubjects(structPropList(result.getSubject())) + .setCreators(result.getAuthor().stream().map(Author::getFullname).collect(Collectors.toList())) + .setPublicationdate(result.getDateofcollection()) + .setPublisher(fieldValue(result.getPublisher())) + .setEmbargoenddate(fieldValue(result.getEmbargoenddate())) + .setContributor(fieldList(result.getContributor())) + .setJournal(result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null) + .setCollectedFrom(result.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toList())) + .setPids(result.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) + .setInstances(result.getInstance().stream().map(ConversionUtils::oafInstanceToBrokerInstances).flatMap(List::stream).collect(Collectors.toList())) + .setExternalReferences(result.getExternalReference().stream().map(ConversionUtils::oafExtRefToBrokerExtRef).collect(Collectors.toList())) + : null; + } + + private static eu.dnetlib.broker.objects.Journal oafJournalToBrokerJournal(final Journal journal) { + return journal != null ? new eu.dnetlib.broker.objects.Journal() + .setName(journal.getName()) + .setIssn(journal.getIssnPrinted()) + .setEissn(journal.getIssnOnline()) + .setLissn(journal.getIssnLinking()) : null; + } + + private static eu.dnetlib.broker.objects.ExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) { + return ref != null ? new eu.dnetlib.broker.objects.ExternalReference() + .setRefidentifier(ref.getRefidentifier()) + .setSitename(ref.getSitename()) + .setType(ref.getQualifier().getClassid()) + .setUrl(ref.getUrl()) + : null; } public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) { - final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project(); - res.setTitle(fieldValue(p.getTitle())); - res.setAcronym(fieldValue(p.getAcronym())); - res.setCode(fieldValue(p.getCode())); - res.setFunder(null); // TODO - res.setFundingProgram(null); // TODO - res.setJurisdiction(null); // TODO + if (p == null) { return null; } + + final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project() + .setTitle(fieldValue(p.getTitle())) + .setAcronym(fieldValue(p.getAcronym())) + .setCode(fieldValue(p.getCode())); + + final String ftree = fieldValue(p.getFundingtree()); + if (StringUtils.isNotBlank(ftree)) { + try { + final Document fdoc = DocumentHelper.parseText(ftree); + res.setFunder(fdoc.valueOf("/fundingtree/funder/shortname")); + res.setJurisdiction(fdoc.valueOf("/fundingtree/funder/jurisdiction")); + res.setFundingProgram(fdoc.valueOf("//funding_level_0/name")); + } catch (final DocumentException e) { + log.error("Error in record " + p.getId() + ": invalid fundingtree: " + ftree); + } + } + return res; } public static final eu.dnetlib.broker.objects.Software oafSoftwareToBrokerSoftware(final Software sw) { - final eu.dnetlib.broker.objects.Software res = new eu.dnetlib.broker.objects.Software(); - res.setName(structPropValue(sw.getTitle())); - res.setDescription(fieldValue(sw.getDescription())); - res.setRepository(fieldValue(sw.getCodeRepositoryUrl())); - res.setLandingPage(fieldValue(sw.getDocumentationUrl())); - return res; - } - - private static Pid structeredPropertyToPid(final StructuredProperty sp) { - final Pid pid = new Pid(); - pid.setValue(sp.getValue()); - pid.setType(sp.getQualifier().getClassid()); - return pid; + return sw != null ? new eu.dnetlib.broker.objects.Software() + .setName(structPropValue(sw.getTitle())) + .setDescription(fieldValue(sw.getDescription())) + .setRepository(fieldValue(sw.getCodeRepositoryUrl())) + .setLandingPage(fieldValue(sw.getDocumentationUrl())) + : null; } private static String fieldValue(final Field f) { @@ -89,14 +136,20 @@ public class ConversionUtils { } private static String fieldValue(final List> fl) { - return fl != null && !fl.isEmpty() && fl.get(0) != null ? fl.get(0).getValue() : null; + return fl != null ? fl.stream().map(Field::getValue).filter(StringUtils::isNotBlank).findFirst().orElse(null) : null; } private static String structPropValue(final List props) { - return props != null && !props.isEmpty() && props.get(0) != null ? props.get(0).getValue() : null; + return props != null ? props.stream().map(StructuredProperty::getValue).filter(StringUtils::isNotBlank).findFirst().orElse(null) : null; + } + + private static List fieldList(final List> fl) { + return fl != null ? fl.stream().map(Field::getValue).filter(StringUtils::isNotBlank).collect(Collectors.toList()) + : new ArrayList<>(); } private static List structPropList(final List props) { - return props != null ? props.stream().map(StructuredProperty::getValue).collect(Collectors.toList()) : new ArrayList<>(); + return props != null ? props.stream().map(StructuredProperty::getValue).filter(StringUtils::isNotBlank).collect(Collectors.toList()) + : new ArrayList<>(); } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java index 5cc0d371d..07b171080 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java @@ -1,12 +1,16 @@ package eu.dnetlib.dhp.broker.oa.util; +import java.util.List; import java.util.function.BiConsumer; import java.util.function.Function; import eu.dnetlib.broker.objects.OpenAireEventPayload; +import eu.dnetlib.broker.objects.Provenance; import eu.dnetlib.broker.objects.Publication; import eu.dnetlib.dhp.broker.model.Topic; +import eu.dnetlib.dhp.schema.oaf.Instance; +import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Result; public final class UpdateInfo { @@ -66,12 +70,29 @@ public final class UpdateInfo { return trust; } - public void compileHighlight(final OpenAireEventPayload payload) { - compileHighlight.accept(payload.getHighlight(), getHighlightValue()); - } - public String getHighlightValueAsString() { return highlightToString.apply(getHighlightValue()); } + public OpenAireEventPayload asBrokerPayload() { + + final Publication p = ConversionUtils.oafResultToBrokerPublication(getSource()); + compileHighlight.accept(p, getHighlightValue()); + + final Publication hl = new Publication(); + compileHighlight.accept(hl, getHighlightValue()); + + final String provId = getSource().getOriginalId().stream().findFirst().orElse(null); + final String provRepo = getSource().getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null); + final String provUrl = getSource().getInstance().stream().map(Instance::getUrl).flatMap(List::stream).findFirst().orElse(null);; + + final Provenance provenance = new Provenance().setId(provId).setRepositoryName(provRepo).setUrl(provUrl); + + return new OpenAireEventPayload() + .setPublication(p) + .setHighlight(hl) + .setTrust(trust) + .setProvenance(provenance); + } + } From 3576dd186b398cfce22744d35dc490c20e30c57c Mon Sep 17 00:00:00 2001 From: Spyros Zoupanos Date: Fri, 5 Jun 2020 22:29:54 +0300 Subject: [PATCH 3/3] Adding hive timeout as workflow parameter --- .../eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml index 6f6389362..efc301573 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml @@ -21,6 +21,10 @@ hiveJdbcUrl hive server jdbc url + + hive_timeout + the time period, in seconds, after which Hive fails a transaction if a Hive client has not sent a hearbeat. The default value is 300 seconds. + @@ -31,6 +35,10 @@ hive.metastore.uris ${hiveMetastoreUris} + + hive.txn.timeout + ${hive_timeout} +