diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml
index 0e84d99a4..59ff05ed3 100644
--- a/dhp-workflows/dhp-broker-events/pom.xml
+++ b/dhp-workflows/dhp-broker-events/pom.xml
@@ -57,7 +57,7 @@
eu.dnetlib
dnet-openaire-broker-common
- [2.0.0,3.0.0)
+ [2.0.1,3.0.0)
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
index 9e5d98644..fcecc8369 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/EventFactory.java
@@ -12,7 +12,6 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
-import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
@@ -37,15 +36,12 @@ public class EventFactory {
final Map map = createMapFromResult(updateInfo);
- final String payload = createPayload(updateInfo);
-
- final String eventId = calculateEventId(
- updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0),
- updateInfo.getHighlightValueAsString());
+ final String eventId =
+ calculateEventId(updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0), updateInfo.getHighlightValueAsString());
res.setEventId(eventId);
res.setProducerId(PRODUCER_ID);
- res.setPayload(payload);
+ res.setPayload(updateInfo.asBrokerPayload().toJSON());
res.setMap(map);
res.setTopic(updateInfo.getTopicPath());
res.setCreationDate(now);
@@ -54,15 +50,6 @@ public class EventFactory {
return res;
}
- private static String createPayload(final UpdateInfo> updateInfo) {
- final OpenAireEventPayload payload = new OpenAireEventPayload();
- // TODO
-
- updateInfo.compileHighlight(payload);
-
- return payload.toJSON();
- }
-
private static Map createMapFromResult(final UpdateInfo> updateInfo) {
final Map map = new HashMap<>();
@@ -93,17 +80,13 @@ public class EventFactory {
final List subjects = target.getSubject();
if (subjects.size() > 0) {
map
- .put(
- "target_publication_subject_list",
- subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList()));
+ .put("target_publication_subject_list", subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList()));
}
final List authors = target.getAuthor();
if (authors.size() > 0) {
map
- .put(
- "target_publication_author_list",
- authors.stream().map(Author::getFullname).collect(Collectors.toList()));
+ .put("target_publication_author_list", authors.stream().map(Author::getFullname).collect(Collectors.toList()));
}
// PROVENANCE INFO
@@ -130,9 +113,7 @@ public class EventFactory {
}
private static long parseDateTolong(final String date) {
- if (StringUtils.isBlank(date)) {
- return -1;
- }
+ if (StringUtils.isBlank(date)) { return -1; }
try {
return DateUtils.parseDate(date, DATE_PATTERNS).getTime();
} catch (final ParseException e) {
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
index 045104720..fede6f8bf 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
@@ -43,20 +43,21 @@ import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublic
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedBy;
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedTo;
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationReferences;
+import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMissingSoftware;
+import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMoreSoftware;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAbstract;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAuthorOrcid;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingOpenAccess;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPid;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSoftware;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSubject;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid;
-import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSoftware;
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
@@ -177,14 +178,6 @@ public class GenerateEventsApplication {
}
- private static JavaRDD generateRelationEvents(final SparkSession spark,
- final String graphPath,
- final Class sourceClass,
- final Class targetClass) {
- // TODO Auto-generated method stub
- return null;
- }
-
private List generateSimpleEvents(final Collection children) {
final List> list = new ArrayList<>();
@@ -203,6 +196,35 @@ public class GenerateEventsApplication {
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
}
+ private static JavaRDD generateRelationEvents(final SparkSession spark,
+ final String graphPath,
+ final Class sourceClass,
+ final Class targetClass) {
+
+ final Dataset sources = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
+ .filter(r -> r.getDataInfo().getDeletedbyinference());
+
+ final Dataset targets = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), targetClass);
+
+ final Dataset mergedRels = readPath(spark, graphPath + "/relation", Relation.class)
+ .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
+
+ final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class)
+ .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
+
+ if (targetClass == Project.class) {
+ // TODO join using: generateProjectsEvents
+ } else if (targetClass == Software.class) {
+ // TODO join using: generateSoftwareEvents
+ } else if (targetClass == Publication.class) {
+ // TODO join using: generatePublicationRelatedEvents
+ } else if (targetClass == eu.dnetlib.dhp.schema.oaf.Dataset.class) {
+ // TODO join using: generateDatasetRelatedEvents
+ }
+
+ return null;
+ }
+
private List generateProjectsEvents(final Collection>> childrenWithProjects) {
final List> list = new ArrayList<>();
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java
index 461266d56..3ce29f864 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java
@@ -3,11 +3,13 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects;
import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Result;
@@ -22,8 +24,16 @@ public class EnrichMissingProject
@Override
protected List> findUpdates(final Pair> source,
final Pair> target) {
- // TODO
- return Arrays.asList();
+
+ if (source.getRight().isEmpty()) {
+ return Arrays.asList();
+ } else {
+ return target.getRight()
+ .stream()
+ .map(ConversionUtils::oafProjectToBrokerProject)
+ .map(p -> generateUpdateInfo(p, source, target))
+ .collect(Collectors.toList());
+ }
}
@Override
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java
index d9bfb62d5..70ab5f71c 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java
@@ -1,13 +1,15 @@
package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects;
-import java.util.Arrays;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Result;
@@ -21,8 +23,18 @@ public class EnrichMoreProject extends UpdateMatcher>
@Override
protected List> findUpdates(final Pair> source,
final Pair> target) {
- // TODO
- return Arrays.asList();
+
+ final Set existingProjects = source.getRight()
+ .stream()
+ .map(Project::getId)
+ .collect(Collectors.toSet());
+
+ return target.getRight()
+ .stream()
+ .filter(p -> !existingProjects.contains(p.getId()))
+ .map(ConversionUtils::oafProjectToBrokerProject)
+ .map(p -> generateUpdateInfo(p, source, target))
+ .collect(Collectors.toList());
}
@Override
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java
index 75e77b3c6..074b6043a 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedPublications/AbstractEnrichMissingPublication.java
@@ -39,7 +39,7 @@ public abstract class AbstractEnrichMissingPublication
.getRight()
.stream()
.filter(d -> !existingPublications.contains(d.getId()))
- .map(ConversionUtils::oafPublicationToBrokerPublication)
+ .map(ConversionUtils::oafResultToBrokerPublication)
.map(i -> generateUpdateInfo(i, source, target))
.collect(Collectors.toList());
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java
similarity index 72%
rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSoftware.java
rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java
index 4fcba43a4..d0d0f4083 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingSoftware.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMissingSoftware.java
@@ -1,13 +1,15 @@
-package eu.dnetlib.dhp.broker.oa.matchers.simple;
+package eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware;
import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
@@ -23,8 +25,16 @@ public class EnrichMissingSoftware
protected List> findUpdates(
final Pair> source,
final Pair> target) {
- // TODO
- return Arrays.asList();
+
+ if (source.getRight().isEmpty()) {
+ return Arrays.asList();
+ } else {
+ return target.getRight()
+ .stream()
+ .map(ConversionUtils::oafSoftwareToBrokerSoftware)
+ .map(p -> generateUpdateInfo(p, source, target))
+ .collect(Collectors.toList());
+ }
}
@Override
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSoftware.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java
similarity index 66%
rename from dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSoftware.java
rename to dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java
index a1affff62..455f08468 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreSoftware.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedSoftware/EnrichMoreSoftware.java
@@ -1,13 +1,15 @@
-package eu.dnetlib.dhp.broker.oa.matchers.simple;
+package eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware;
-import java.util.Arrays;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
+import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
@@ -23,8 +25,18 @@ public class EnrichMoreSoftware
protected List> findUpdates(
final Pair> source,
final Pair> target) {
- // TODO
- return Arrays.asList();
+
+ final Set existingSoftwares = source.getRight()
+ .stream()
+ .map(Software::getId)
+ .collect(Collectors.toSet());
+
+ return target.getRight()
+ .stream()
+ .filter(p -> !existingSoftwares.contains(p.getId()))
+ .map(ConversionUtils::oafSoftwareToBrokerSoftware)
+ .map(p -> generateUpdateInfo(p, source, target))
+ .collect(Collectors.toList());
}
@Override
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java
index c7e9dcbc1..524846d06 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingOpenAccess.java
@@ -28,16 +28,14 @@ public class EnrichMissingOpenAccess extends UpdateMatcher {
.filter(right -> right.equals(BrokerConstants.OPEN_ACCESS))
.count();
- if (count > 0) {
- return Arrays.asList();
- }
+ if (count > 0) { return Arrays.asList(); }
return source
.getInstance()
.stream()
.filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
.map(ConversionUtils::oafInstanceToBrokerInstances)
- .flatMap(s -> s)
+ .flatMap(List::stream)
.map(i -> generateUpdateInfo(i, source, target))
.collect(Collectors.toList());
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java
index c376da44d..f9c5f81da 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMoreOpenAccess.java
@@ -34,7 +34,7 @@ public class EnrichMoreOpenAccess extends UpdateMatcher {
.stream()
.filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
.map(ConversionUtils::oafInstanceToBrokerInstances)
- .flatMap(s -> s)
+ .flatMap(List::stream)
.filter(i -> !urls.contains(i.getUrl()))
.map(i -> generateUpdateInfo(i, source, target))
.collect(Collectors.toList());
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java
index 2f87d0ee7..d18c46e56 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java
@@ -1,49 +1,155 @@
package eu.dnetlib.dhp.broker.oa.util;
-import java.util.stream.Stream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.dom4j.Document;
+import org.dom4j.DocumentException;
+import org.dom4j.DocumentHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import eu.dnetlib.broker.objects.Instance;
import eu.dnetlib.broker.objects.Pid;
+import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Dataset;
+import eu.dnetlib.dhp.schema.oaf.ExternalReference;
+import eu.dnetlib.dhp.schema.oaf.Field;
+import eu.dnetlib.dhp.schema.oaf.Instance;
+import eu.dnetlib.dhp.schema.oaf.Journal;
+import eu.dnetlib.dhp.schema.oaf.KeyValue;
+import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
+import eu.dnetlib.dhp.schema.oaf.Result;
+import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class ConversionUtils {
- public static Stream oafInstanceToBrokerInstances(final eu.dnetlib.dhp.schema.oaf.Instance i) {
+ private static final Logger log = LoggerFactory.getLogger(ConversionUtils.class);
+
+ public static List oafInstanceToBrokerInstances(final Instance i) {
return i.getUrl().stream().map(url -> {
- final Instance r = new Instance();
- r.setUrl(url);
- r.setInstancetype(i.getInstancetype().getClassid());
- r.setLicense(BrokerConstants.OPEN_ACCESS);
- r.setHostedby(i.getHostedby().getValue());
- return r;
- });
+ return new eu.dnetlib.broker.objects.Instance()
+ .setUrl(url)
+ .setInstancetype(i.getInstancetype().getClassid())
+ .setLicense(BrokerConstants.OPEN_ACCESS)
+ .setHostedby(i.getHostedby().getValue());
+ }).collect(Collectors.toList());
}
public static Pid oafPidToBrokerPid(final StructuredProperty sp) {
- final Pid pid = new Pid();
- pid.setValue(sp.getValue());
- pid.setType(sp.getQualifier().getClassid());
- return pid;
+ return sp != null ? new Pid()
+ .setValue(sp.getValue())
+ .setType(sp.getQualifier().getClassid()) : null;
}
public static final Pair oafSubjectToPair(final StructuredProperty sp) {
- return Pair.of(sp.getQualifier().getClassid(), sp.getValue());
+ return sp != null ? Pair.of(sp.getQualifier().getClassid(), sp.getValue()) : null;
}
public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) {
- final eu.dnetlib.broker.objects.Dataset res = new eu.dnetlib.broker.objects.Dataset();
- // TODO
+ return d != null ? new eu.dnetlib.broker.objects.Dataset()
+ .setOriginalId(d.getOriginalId().get(0))
+ .setTitles(structPropList(d.getTitle()))
+ .setPids(d.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
+ .setInstances(d.getInstance().stream().map(ConversionUtils::oafInstanceToBrokerInstances).flatMap(List::stream).collect(Collectors.toList()))
+ .setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toList()))
+ : null;
+ }
+
+ public static final eu.dnetlib.broker.objects.Publication oafResultToBrokerPublication(final Result result) {
+
+ return result != null ? new eu.dnetlib.broker.objects.Publication()
+ .setOriginalId(result.getOriginalId().get(0))
+ .setTitles(structPropList(result.getTitle()))
+ .setAbstracts(fieldList(result.getDescription()))
+ .setLanguage(result.getLanguage().getClassid())
+ .setSubjects(structPropList(result.getSubject()))
+ .setCreators(result.getAuthor().stream().map(Author::getFullname).collect(Collectors.toList()))
+ .setPublicationdate(result.getDateofcollection())
+ .setPublisher(fieldValue(result.getPublisher()))
+ .setEmbargoenddate(fieldValue(result.getEmbargoenddate()))
+ .setContributor(fieldList(result.getContributor()))
+ .setJournal(result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null)
+ .setCollectedFrom(result.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toList()))
+ .setPids(result.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
+ .setInstances(result.getInstance().stream().map(ConversionUtils::oafInstanceToBrokerInstances).flatMap(List::stream).collect(Collectors.toList()))
+ .setExternalReferences(result.getExternalReference().stream().map(ConversionUtils::oafExtRefToBrokerExtRef).collect(Collectors.toList()))
+ : null;
+ }
+
+ private static eu.dnetlib.broker.objects.Journal oafJournalToBrokerJournal(final Journal journal) {
+ return journal != null ? new eu.dnetlib.broker.objects.Journal()
+ .setName(journal.getName())
+ .setIssn(journal.getIssnPrinted())
+ .setEissn(journal.getIssnOnline())
+ .setLissn(journal.getIssnLinking()) : null;
+ }
+
+ private static eu.dnetlib.broker.objects.ExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) {
+ return ref != null ? new eu.dnetlib.broker.objects.ExternalReference()
+ .setRefidentifier(ref.getRefidentifier())
+ .setSitename(ref.getSitename())
+ .setType(ref.getQualifier().getClassid())
+ .setUrl(ref.getUrl())
+ : null;
+ }
+
+ public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) {
+ if (p == null) { return null; }
+
+ final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project()
+ .setTitle(fieldValue(p.getTitle()))
+ .setAcronym(fieldValue(p.getAcronym()))
+ .setCode(fieldValue(p.getCode()));
+
+ final String ftree = fieldValue(p.getFundingtree());
+ if (StringUtils.isNotBlank(ftree)) {
+ try {
+ final Document fdoc = DocumentHelper.parseText(ftree);
+ res.setFunder(fdoc.valueOf("/fundingtree/funder/shortname"));
+ res.setJurisdiction(fdoc.valueOf("/fundingtree/funder/jurisdiction"));
+ res.setFundingProgram(fdoc.valueOf("//funding_level_0/name"));
+ } catch (final DocumentException e) {
+ log.error("Error in record " + p.getId() + ": invalid fundingtree: " + ftree);
+ }
+ }
+
return res;
}
- public static final eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication d) {
- final eu.dnetlib.broker.objects.Publication res = new eu.dnetlib.broker.objects.Publication();
- // TODO
- return res;
+ public static final eu.dnetlib.broker.objects.Software oafSoftwareToBrokerSoftware(final Software sw) {
+ return sw != null ? new eu.dnetlib.broker.objects.Software()
+ .setName(structPropValue(sw.getTitle()))
+ .setDescription(fieldValue(sw.getDescription()))
+ .setRepository(fieldValue(sw.getCodeRepositoryUrl()))
+ .setLandingPage(fieldValue(sw.getDocumentationUrl()))
+ : null;
+ }
+
+ private static String fieldValue(final Field f) {
+ return f != null ? f.getValue() : null;
+ }
+
+ private static String fieldValue(final List> fl) {
+ return fl != null ? fl.stream().map(Field::getValue).filter(StringUtils::isNotBlank).findFirst().orElse(null) : null;
+ }
+
+ private static String structPropValue(final List props) {
+ return props != null ? props.stream().map(StructuredProperty::getValue).filter(StringUtils::isNotBlank).findFirst().orElse(null) : null;
+ }
+
+ private static List fieldList(final List> fl) {
+ return fl != null ? fl.stream().map(Field::getValue).filter(StringUtils::isNotBlank).collect(Collectors.toList())
+ : new ArrayList<>();
+ }
+
+ private static List structPropList(final List props) {
+ return props != null ? props.stream().map(StructuredProperty::getValue).filter(StringUtils::isNotBlank).collect(Collectors.toList())
+ : new ArrayList<>();
}
}
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
index 5cc0d371d..07b171080 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
@@ -1,12 +1,16 @@
package eu.dnetlib.dhp.broker.oa.util;
+import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Function;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
+import eu.dnetlib.broker.objects.Provenance;
import eu.dnetlib.broker.objects.Publication;
import eu.dnetlib.dhp.broker.model.Topic;
+import eu.dnetlib.dhp.schema.oaf.Instance;
+import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Result;
public final class UpdateInfo {
@@ -66,12 +70,29 @@ public final class UpdateInfo {
return trust;
}
- public void compileHighlight(final OpenAireEventPayload payload) {
- compileHighlight.accept(payload.getHighlight(), getHighlightValue());
- }
-
public String getHighlightValueAsString() {
return highlightToString.apply(getHighlightValue());
}
+ public OpenAireEventPayload asBrokerPayload() {
+
+ final Publication p = ConversionUtils.oafResultToBrokerPublication(getSource());
+ compileHighlight.accept(p, getHighlightValue());
+
+ final Publication hl = new Publication();
+ compileHighlight.accept(hl, getHighlightValue());
+
+ final String provId = getSource().getOriginalId().stream().findFirst().orElse(null);
+ final String provRepo = getSource().getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null);
+ final String provUrl = getSource().getInstance().stream().map(Instance::getUrl).flatMap(List::stream).findFirst().orElse(null);;
+
+ final Provenance provenance = new Provenance().setId(provId).setRepositoryName(provRepo).setUrl(provUrl);
+
+ return new OpenAireEventPayload()
+ .setPublication(p)
+ .setHighlight(hl)
+ .setTrust(trust)
+ .setProvenance(provenance);
+ }
+
}
diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml
index 6f6389362..efc301573 100644
--- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml
@@ -21,6 +21,10 @@
hiveJdbcUrl
hive server jdbc url
+
+ hive_timeout
+ the time period, in seconds, after which Hive fails a transaction if a Hive client has not sent a hearbeat. The default value is 300 seconds.
+
@@ -31,6 +35,10 @@
hive.metastore.uris
${hiveMetastoreUris}
+
+ hive.txn.timeout
+ ${hive_timeout}
+