diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml
index cd3257991..f943ac93a 100644
--- a/dhp-workflows/dhp-broker-events/pom.xml
+++ b/dhp-workflows/dhp-broker-events/pom.xml
@@ -53,7 +53,7 @@
eu.dnetlib
dnet-openaire-broker-common
- [3.0.1,4.0.0)
+ [3.0.2,4.0.0)
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
index 3357710f0..ae313813d 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java
@@ -30,11 +30,9 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory;
import eu.dnetlib.dhp.common.HdfsSupport;
-import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
-import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
@@ -85,7 +83,9 @@ public class GenerateEventsApplication {
removeOutputDir(spark, eventsPath);
// TODO REMOVE THIS
- expandResultsWithRelations(spark, graphPath, Publication.class)
+ readPath(spark, graphPath + "/publication", Publication.class)
+ .filter(r -> r.getDataInfo().getDeletedbyinference())
+ .map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class))
.write()
.mode(SaveMode.Overwrite)
.json(eventsPath);
@@ -141,15 +141,15 @@ public class GenerateEventsApplication {
final String graphPath,
final Class sourceClass) {
- final Dataset projects = readPath(spark, graphPath + "/project", Project.class);
- final Dataset datasets = readPath(
- spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
- final Dataset softwares = readPath(spark, graphPath + "/software", Software.class);
- final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class);
+ // final Dataset projects = readPath(spark, graphPath + "/project", Project.class);
+ // final Dataset datasets = readPath(
+ // spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
+ // final Dataset softwares = readPath(spark, graphPath + "/software", Software.class);
+ // final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class);
- final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class)
- .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
- .cache();
+ // final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class)
+ // .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
+ // .cache();
final Dataset r0 = readPath(
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
@@ -185,7 +185,6 @@ public class GenerateEventsApplication {
final TypedColumn, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator()
.toColumn();
- ;
return sources
.joinWith(typedRels, sources.col("openaireId").equalTo(rels.col("source")), "left_outer")
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java
index d04ef45a0..d8f9dffbe 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java
@@ -3,6 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
@@ -13,6 +14,8 @@ import org.dom4j.DocumentHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Function;
+
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import eu.dnetlib.broker.objects.TypedValue;
import eu.dnetlib.dhp.schema.oaf.Author;
@@ -24,6 +27,7 @@ import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
+import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
@@ -33,133 +37,186 @@ public class ConversionUtils {
private static final Logger log = LoggerFactory.getLogger(ConversionUtils.class);
public static List oafInstanceToBrokerInstances(final Instance i) {
- return i.getUrl().stream().map(url -> {
- return new eu.dnetlib.broker.objects.Instance()
- .setUrl(url)
- .setInstancetype(i.getInstancetype().getClassid())
- .setLicense(BrokerConstants.OPEN_ACCESS)
- .setHostedby(i.getHostedby().getValue());
- }).collect(Collectors.toList());
+ if (i == null) {
+ return new ArrayList<>();
+ }
+
+ return mappedList(i.getUrl(), url -> {
+ final eu.dnetlib.broker.objects.Instance res = new eu.dnetlib.broker.objects.Instance();
+ res.setUrl(url);
+ res.setInstancetype(classId(i.getInstancetype()));
+ res.setLicense(BrokerConstants.OPEN_ACCESS);
+ res.setHostedby(kvValue(i.getHostedby()));
+ return res;
+ });
}
public static TypedValue oafPidToBrokerPid(final StructuredProperty sp) {
- return sp != null ? new TypedValue()
- .setValue(sp.getValue())
- .setType(sp.getQualifier().getClassid()) : null;
+ return oafStructPropToBrokerTypedValue(sp);
+ }
+
+ public static TypedValue oafStructPropToBrokerTypedValue(final StructuredProperty sp) {
+ return sp != null ? new TypedValue(classId(sp.getQualifier()), sp.getValue()) : null;
}
public static final Pair oafSubjectToPair(final StructuredProperty sp) {
- return sp != null ? Pair.of(sp.getQualifier().getClassid(), sp.getValue()) : null;
+ return sp != null ? Pair.of(classId(sp.getQualifier()), sp.getValue()) : null;
}
public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) {
- return d != null ? new eu.dnetlib.broker.objects.Dataset()
- .setOriginalId(d.getOriginalId().get(0))
- .setTitle(structPropValue(d.getTitle()))
- .setPids(d.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
- .setInstances(
- d
- .getInstance()
- .stream()
- .map(ConversionUtils::oafInstanceToBrokerInstances)
- .flatMap(List::stream)
- .collect(Collectors.toList()))
- .setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null))
- : null;
+ if (d == null) {
+ return null;
+ }
+
+ final eu.dnetlib.broker.objects.Dataset res = new eu.dnetlib.broker.objects.Dataset();
+ res.setOriginalId(first(d.getOriginalId()));
+ res.setTitle(structPropValue(d.getTitle()));
+ res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid));
+ res.setInstances(flatMappedList(d.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
+ res.setCollectedFrom(mappedFirst(d.getCollectedfrom(), KeyValue::getValue));
+ return res;
}
public static eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication p) {
- return p != null ? new eu.dnetlib.broker.objects.Publication()
- .setOriginalId(p.getOriginalId().get(0))
- .setTitle(structPropValue(p.getTitle()))
- .setPids(p.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
- .setInstances(
- p
- .getInstance()
- .stream()
- .map(ConversionUtils::oafInstanceToBrokerInstances)
- .flatMap(List::stream)
- .collect(Collectors.toList()))
- .setCollectedFrom(p.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null))
- : null;
+ if (p == null) {
+ return null;
+ }
+
+ final eu.dnetlib.broker.objects.Publication res = new eu.dnetlib.broker.objects.Publication();
+ res.setOriginalId(first(p.getOriginalId()));
+ res.setTitle(structPropValue(p.getTitle()));
+ res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid));
+ res.setInstances(flatMappedList(p.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
+ res.setCollectedFrom(mappedFirst(p.getCollectedfrom(), KeyValue::getValue));
+
+ return res;
}
public static final OpenaireBrokerResult oafResultToBrokerResult(final Result result) {
+ if (result == null) {
+ return null;
+ }
- return result != null ? new OpenaireBrokerResult()
- .setOpenaireId(result.getId())
- .setOriginalId(result.getOriginalId().get(0))
- .setTypology(result.getResulttype().getClassid())
- .setTitles(structPropList(result.getTitle()))
- .setAbstracts(fieldList(result.getDescription()))
- .setLanguage(result.getLanguage().getClassid())
- .setSubjects(structPropTypedList(result.getSubject()))
- .setCreators(
- result.getAuthor().stream().map(ConversionUtils::oafAuthorToBrokerAuthor).collect(Collectors.toList()))
- .setPublicationdate(result.getDateofacceptance().getValue())
- .setPublisher(fieldValue(result.getPublisher()))
- .setEmbargoenddate(fieldValue(result.getEmbargoenddate()))
- .setContributor(fieldList(result.getContributor()))
+ final OpenaireBrokerResult res = new OpenaireBrokerResult();
+
+ res.setOpenaireId(result.getId());
+ res.setOriginalId(first(result.getOriginalId()));
+ res.setTypology(classId(result.getResulttype()));
+ res.setTitles(structPropList(result.getTitle()));
+ res.setAbstracts(fieldList(result.getDescription()));
+ res.setLanguage(classId(result.getLanguage()));
+ res.setSubjects(structPropTypedList(result.getSubject()));
+ res.setCreators(mappedList(result.getAuthor(), ConversionUtils::oafAuthorToBrokerAuthor));
+ res.setPublicationdate(fieldValue(result.getDateofacceptance()));
+ res.setPublisher(fieldValue(result.getPublisher()));
+ res.setEmbargoenddate(fieldValue(result.getEmbargoenddate()));
+ res.setContributor(fieldList(result.getContributor()));
+ res
.setJournal(
- result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null)
- .setCollectedFromId(result.getCollectedfrom().stream().map(KeyValue::getKey).findFirst().orElse(null))
- .setCollectedFromName(result.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null))
- .setPids(result.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
- .setInstances(
- result
- .getInstance()
- .stream()
- .map(ConversionUtils::oafInstanceToBrokerInstances)
- .flatMap(List::stream)
- .collect(Collectors.toList()))
- .setExternalReferences(
- result
- .getExternalReference()
- .stream()
- .map(ConversionUtils::oafExtRefToBrokerExtRef)
- .collect(Collectors.toList()))
- : null;
+ result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null);
+ res.setCollectedFromId(mappedFirst(result.getCollectedfrom(), KeyValue::getKey));
+ res.setCollectedFromName(mappedFirst(result.getCollectedfrom(), KeyValue::getValue));
+ res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid));
+ res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
+ res.setExternalReferences(mappedList(result.getExternalReference(), ConversionUtils::oafExtRefToBrokerExtRef));
+
+ return res;
}
private static List structPropTypedList(final List list) {
+ if (list == null) {
+ return new ArrayList<>();
+ }
+
return list
.stream()
- .map(
- p -> new TypedValue()
- .setValue(p.getValue())
- .setType(p.getQualifier().getClassid()))
+ .map(ConversionUtils::oafStructPropToBrokerTypedValue)
.collect(Collectors.toList());
}
+ private static List mappedList(final List list, final Function func) {
+ if (list == null) {
+ return new ArrayList<>();
+ }
+
+ return list
+ .stream()
+ .map(func::apply)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+
+ private static List flatMappedList(final List list, final Function> func) {
+ if (list == null) {
+ return new ArrayList<>();
+ }
+
+ return list
+ .stream()
+ .map(func::apply)
+ .flatMap(List::stream)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+
+ private static T mappedFirst(final List list, final Function func) {
+ if (list == null) {
+ return null;
+ }
+
+ return list
+ .stream()
+ .map(func::apply)
+ .filter(Objects::nonNull)
+ .findFirst()
+ .orElse(null);
+ }
+
private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) {
- return author != null ? new eu.dnetlib.broker.objects.Author()
- .setFullname(author.getFullname())
- .setOrcid(
- author
- .getPid()
- .stream()
- .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid"))
- .map(pid -> pid.getValue())
- .findFirst()
- .orElse(null))
- : null;
+ if (author == null) {
+ return null;
+ }
+
+ final String pids = author.getPid() != null ? author
+ .getPid()
+ .stream()
+ .filter(pid -> pid != null)
+ .filter(pid -> pid.getQualifier() != null)
+ .filter(pid -> pid.getQualifier().getClassid() != null)
+ .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid"))
+ .map(pid -> pid.getValue())
+ .filter(StringUtils::isNotBlank)
+ .findFirst()
+ .orElse(null) : null;
+
+ return new eu.dnetlib.broker.objects.Author(author.getFullname(), pids);
}
private static eu.dnetlib.broker.objects.Journal oafJournalToBrokerJournal(final Journal journal) {
- return journal != null ? new eu.dnetlib.broker.objects.Journal()
- .setName(journal.getName())
- .setIssn(journal.getIssnPrinted())
- .setEissn(journal.getIssnOnline())
- .setLissn(journal.getIssnLinking()) : null;
+ if (journal == null) {
+ return null;
+ }
+
+ final eu.dnetlib.broker.objects.Journal res = new eu.dnetlib.broker.objects.Journal();
+ res.setName(journal.getName());
+ res.setIssn(journal.getIssnPrinted());
+ res.setEissn(journal.getIssnOnline());
+ res.setLissn(journal.getIssnLinking());
+
+ return res;
}
private static eu.dnetlib.broker.objects.ExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) {
- return ref != null ? new eu.dnetlib.broker.objects.ExternalReference()
- .setRefidentifier(ref.getRefidentifier())
- .setSitename(ref.getSitename())
- .setType(ref.getQualifier().getClassid())
- .setUrl(ref.getUrl())
- : null;
+ if (ref == null) {
+ return null;
+ }
+
+ final eu.dnetlib.broker.objects.ExternalReference res = new eu.dnetlib.broker.objects.ExternalReference();
+ res.setRefidentifier(ref.getRefidentifier());
+ res.setSitename(ref.getSitename());
+ res.setType(classId(ref.getQualifier()));
+ res.setUrl(ref.getUrl());
+ return res;
}
public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) {
@@ -167,10 +224,10 @@ public class ConversionUtils {
return null;
}
- final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project()
- .setTitle(fieldValue(p.getTitle()))
- .setAcronym(fieldValue(p.getAcronym()))
- .setCode(fieldValue(p.getCode()));
+ final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project();
+ res.setTitle(fieldValue(p.getTitle()));
+ res.setAcronym(fieldValue(p.getAcronym()));
+ res.setCode(fieldValue(p.getCode()));
final String ftree = fieldValue(p.getFundingtree());
if (StringUtils.isNotBlank(ftree)) {
@@ -188,12 +245,25 @@ public class ConversionUtils {
}
public static final eu.dnetlib.broker.objects.Software oafSoftwareToBrokerSoftware(final Software sw) {
- return sw != null ? new eu.dnetlib.broker.objects.Software()
- .setName(structPropValue(sw.getTitle()))
- .setDescription(fieldValue(sw.getDescription()))
- .setRepository(fieldValue(sw.getCodeRepositoryUrl()))
- .setLandingPage(fieldValue(sw.getDocumentationUrl()))
- : null;
+ if (sw == null) {
+ return null;
+ }
+
+ final eu.dnetlib.broker.objects.Software res = new eu.dnetlib.broker.objects.Software();
+ res.setName(structPropValue(sw.getTitle()));
+ res.setDescription(fieldValue(sw.getDescription()));
+ res.setRepository(fieldValue(sw.getCodeRepositoryUrl()));
+ res.setLandingPage(fieldValue(sw.getDocumentationUrl()));
+
+ return res;
+ }
+
+ private static String first(final List list) {
+ return list != null && list.size() > 0 ? list.get(0) : null;
+ }
+
+ private static String kvValue(final KeyValue kv) {
+ return kv != null ? kv.getValue() : null;
}
private static String fieldValue(final Field f) {
@@ -205,6 +275,10 @@ public class ConversionUtils {
: null;
}
+ private static String classId(final Qualifier q) {
+ return q != null ? q.getClassid() : null;
+ }
+
private static String structPropValue(final List props) {
return props != null
? props.stream().map(StructuredProperty::getValue).filter(StringUtils::isNotBlank).findFirst().orElse(null)
diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
index fca9cf89e..2c4bda53d 100644
--- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
+++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java
@@ -122,13 +122,15 @@ public final class UpdateInfo {
.orElse(null);
;
- final Provenance provenance = new Provenance().setId(provId).setRepositoryName(provRepo).setUrl(provUrl);
+ final Provenance provenance = new Provenance(provId, provRepo, provUrl);
- return new OpenAireEventPayload()
- .setPublication(target)
- .setHighlight(hl)
- .setTrust(trust)
- .setProvenance(provenance);
+ final OpenAireEventPayload res = new OpenAireEventPayload();
+ res.setResult(target);
+ res.setHighlight(hl);
+ res.setTrust(trust);
+ res.setProvenance(provenance);
+
+ return res;
}
}