diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java index 9e5d986448..609f206b74 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java @@ -39,9 +39,8 @@ public class EventFactory { final String payload = createPayload(updateInfo); - final String eventId = calculateEventId( - updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0), - updateInfo.getHighlightValueAsString()); + final String eventId = + calculateEventId(updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0), updateInfo.getHighlightValueAsString()); res.setEventId(eventId); res.setProducerId(PRODUCER_ID); @@ -56,7 +55,7 @@ public class EventFactory { private static String createPayload(final UpdateInfo updateInfo) { final OpenAireEventPayload payload = new OpenAireEventPayload(); - // TODO + // TODO : use ConversionUtils updateInfo.compileHighlight(payload); @@ -93,17 +92,13 @@ public class EventFactory { final List subjects = target.getSubject(); if (subjects.size() > 0) { map - .put( - "target_publication_subject_list", - subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList())); + .put("target_publication_subject_list", subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList())); } final List authors = target.getAuthor(); if (authors.size() > 0) { map - .put( - "target_publication_author_list", - authors.stream().map(Author::getFullname).collect(Collectors.toList())); + .put("target_publication_author_list", authors.stream().map(Author::getFullname).collect(Collectors.toList())); } // PROVENANCE INFO @@ -130,9 +125,7 @@ public class EventFactory { } private static long parseDateTolong(final String date) { - if (StringUtils.isBlank(date)) { - return -1; - } + if (StringUtils.isBlank(date)) { return -1; } try { return DateUtils.parseDate(date, DATE_PATTERNS).getTime(); } catch (final ParseException e) { diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index 045104720c..fede6f8bfb 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -43,20 +43,21 @@ import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublic import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedBy; import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedTo; import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationReferences; +import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMissingSoftware; +import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMoreSoftware; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAbstract; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAuthorOrcid; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingOpenAccess; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPid; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSoftware; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSubject; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid; -import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSoftware; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Relation; @@ -177,14 +178,6 @@ public class GenerateEventsApplication { } - private static JavaRDD generateRelationEvents(final SparkSession spark, - final String graphPath, - final Class sourceClass, - final Class targetClass) { - // TODO Auto-generated method stub - return null; - } - private List generateSimpleEvents(final Collection children) { final List> list = new ArrayList<>(); @@ -203,6 +196,35 @@ public class GenerateEventsApplication { return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList()); } + private static JavaRDD generateRelationEvents(final SparkSession spark, + final String graphPath, + final Class sourceClass, + final Class targetClass) { + + final Dataset sources = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass) + .filter(r -> r.getDataInfo().getDeletedbyinference()); + + final Dataset targets = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), targetClass); + + final Dataset mergedRels = readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); + + final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) + .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); + + if (targetClass == Project.class) { + // TODO join using: generateProjectsEvents + } else if (targetClass == Software.class) { + // TODO join using: generateSoftwareEvents + } else if (targetClass == Publication.class) { + // TODO join using: generatePublicationRelatedEvents + } else if (targetClass == eu.dnetlib.dhp.schema.oaf.Dataset.class) { + // TODO join using: generateDatasetRelatedEvents + } + + return null; + } + private List generateProjectsEvents(final Collection>> childrenWithProjects) { final List> list = new ArrayList<>(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java index 461266d567..3ce29f864b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java @@ -3,11 +3,13 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; +import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Result; @@ -22,8 +24,16 @@ public class EnrichMissingProject @Override protected List> findUpdates(final Pair> source, final Pair> target) { - // TODO - return Arrays.asList(); + + if (source.getRight().isEmpty()) { + return Arrays.asList(); + } else { + return target.getRight() + .stream() + .map(ConversionUtils::oafProjectToBrokerProject) + .map(p -> generateUpdateInfo(p, source, target)) + .collect(Collectors.toList()); + } } @Override diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java index d9bfb62d59..70ab5f71c9 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java @@ -1,13 +1,15 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects; -import java.util.Arrays; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; +import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Result; @@ -21,8 +23,18 @@ public class EnrichMoreProject extends UpdateMatcher> @Override protected List> findUpdates(final Pair> source, final Pair> target) { - // TODO - return Arrays.asList(); + + final Set existingProjects = source.getRight() + .stream() + .map(Project::getId) + .collect(Collectors.toSet()); + + return target.getRight() + .stream() + .filter(p -> !existingProjects.contains(p.getId())) + .map(ConversionUtils::oafProjectToBrokerProject) + .map(p -> generateUpdateInfo(p, source, target)) + .collect(Collectors.toList()); } @Override diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java similarity index 72% rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSoftware.java rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java index 4fcba43a40..d0d0f4083b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java @@ -1,13 +1,15 @@ -package eu.dnetlib.dhp.broker.oa.matchers.simple; +package eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; +import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Software; @@ -23,8 +25,16 @@ public class EnrichMissingSoftware protected List> findUpdates( final Pair> source, final Pair> target) { - // TODO - return Arrays.asList(); + + if (source.getRight().isEmpty()) { + return Arrays.asList(); + } else { + return target.getRight() + .stream() + .map(ConversionUtils::oafSoftwareToBrokerSoftware) + .map(p -> generateUpdateInfo(p, source, target)) + .collect(Collectors.toList()); + } } @Override diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java similarity index 66% rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSoftware.java rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java index a1affff623..455f084681 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSoftware.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java @@ -1,13 +1,15 @@ -package eu.dnetlib.dhp.broker.oa.matchers.simple; +package eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware; -import java.util.Arrays; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; +import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Software; @@ -23,8 +25,18 @@ public class EnrichMoreSoftware protected List> findUpdates( final Pair> source, final Pair> target) { - // TODO - return Arrays.asList(); + + final Set existingSoftwares = source.getRight() + .stream() + .map(Software::getId) + .collect(Collectors.toSet()); + + return target.getRight() + .stream() + .filter(p -> !existingSoftwares.contains(p.getId())) + .map(ConversionUtils::oafSoftwareToBrokerSoftware) + .map(p -> generateUpdateInfo(p, source, target)) + .collect(Collectors.toList()); } @Override diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java index c7e9dcbc11..524846d065 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java @@ -28,16 +28,14 @@ public class EnrichMissingOpenAccess extends UpdateMatcher { .filter(right -> right.equals(BrokerConstants.OPEN_ACCESS)) .count(); - if (count > 0) { - return Arrays.asList(); - } + if (count > 0) { return Arrays.asList(); } return source .getInstance() .stream() .filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS)) .map(ConversionUtils::oafInstanceToBrokerInstances) - .flatMap(s -> s) + .flatMap(List::stream) .map(i -> generateUpdateInfo(i, source, target)) .collect(Collectors.toList()); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java index c376da44d2..f9c5f81da1 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java @@ -34,7 +34,7 @@ public class EnrichMoreOpenAccess extends UpdateMatcher { .stream() .filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS)) .map(ConversionUtils::oafInstanceToBrokerInstances) - .flatMap(s -> s) + .flatMap(List::stream) .filter(i -> !urls.contains(i.getUrl())) .map(i -> generateUpdateInfo(i, source, target)) .collect(Collectors.toList()); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index 2f87d0ee76..4961fc4e44 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -1,19 +1,25 @@ package eu.dnetlib.dhp.broker.oa.util; -import java.util.stream.Stream; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import eu.dnetlib.broker.objects.Instance; import eu.dnetlib.broker.objects.Pid; import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; public class ConversionUtils { - public static Stream oafInstanceToBrokerInstances(final eu.dnetlib.dhp.schema.oaf.Instance i) { + public static List oafInstanceToBrokerInstances(final eu.dnetlib.dhp.schema.oaf.Instance i) { return i.getUrl().stream().map(url -> { final Instance r = new Instance(); r.setUrl(url); @@ -21,7 +27,7 @@ public class ConversionUtils { r.setLicense(BrokerConstants.OPEN_ACCESS); r.setHostedby(i.getHostedby().getValue()); return r; - }); + }).collect(Collectors.toList()); } public static Pid oafPidToBrokerPid(final StructuredProperty sp) { @@ -37,13 +43,60 @@ public class ConversionUtils { public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) { final eu.dnetlib.broker.objects.Dataset res = new eu.dnetlib.broker.objects.Dataset(); - // TODO + res.setOriginalId(d.getOriginalId().get(0)); + res.setTitles(structPropList(d.getTitle())); + res.setPids(d.getPid().stream().map(ConversionUtils::structeredPropertyToPid).collect(Collectors.toList())); + res.setInstances(d.getInstance().stream().map(ConversionUtils::oafInstanceToBrokerInstances).flatMap(List::stream).collect(Collectors.toList())); + res.setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toList())); return res; } public static final eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication d) { final eu.dnetlib.broker.objects.Publication res = new eu.dnetlib.broker.objects.Publication(); - // TODO + // TODO This should be reusable return res; } + + public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) { + final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project(); + res.setTitle(fieldValue(p.getTitle())); + res.setAcronym(fieldValue(p.getAcronym())); + res.setCode(fieldValue(p.getCode())); + res.setFunder(null); // TODO + res.setFundingProgram(null); // TODO + res.setJurisdiction(null); // TODO + return res; + } + + public static final eu.dnetlib.broker.objects.Software oafSoftwareToBrokerSoftware(final Software sw) { + final eu.dnetlib.broker.objects.Software res = new eu.dnetlib.broker.objects.Software(); + res.setName(structPropValue(sw.getTitle())); + res.setDescription(fieldValue(sw.getDescription())); + res.setRepository(fieldValue(sw.getCodeRepositoryUrl())); + res.setLandingPage(fieldValue(sw.getDocumentationUrl())); + return res; + } + + private static Pid structeredPropertyToPid(final StructuredProperty sp) { + final Pid pid = new Pid(); + pid.setValue(sp.getValue()); + pid.setType(sp.getQualifier().getClassid()); + return pid; + } + + private static String fieldValue(final Field f) { + return f != null ? f.getValue() : null; + } + + private static String fieldValue(final List> fl) { + return fl != null && !fl.isEmpty() && fl.get(0) != null ? fl.get(0).getValue() : null; + } + + private static String structPropValue(final List props) { + return props != null && !props.isEmpty() && props.get(0) != null ? props.get(0).getValue() : null; + } + + private static List structPropList(final List props) { + return props != null ? props.stream().map(StructuredProperty::getValue).collect(Collectors.toList()) : new ArrayList<>(); + } }