enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
4 changed files with 199 additions and 124 deletions
Showing only changes of commit 834f139e6e - Show all commits

View File

@ -53,7 +53,7 @@
<dependency> <dependency>
<groupId>eu.dnetlib</groupId> <groupId>eu.dnetlib</groupId>
<artifactId>dnet-openaire-broker-common</artifactId> <artifactId>dnet-openaire-broker-common</artifactId>
<version>[3.0.1,4.0.0)</version> <version>[3.0.2,4.0.0)</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -30,11 +30,9 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.config.DedupConfig;
@ -85,7 +83,9 @@ public class GenerateEventsApplication {
removeOutputDir(spark, eventsPath); removeOutputDir(spark, eventsPath);
// TODO REMOVE THIS // TODO REMOVE THIS
expandResultsWithRelations(spark, graphPath, Publication.class) readPath(spark, graphPath + "/publication", Publication.class)
.filter(r -> r.getDataInfo().getDeletedbyinference())
.map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.json(eventsPath); .json(eventsPath);
@ -141,15 +141,15 @@ public class GenerateEventsApplication {
final String graphPath, final String graphPath,
final Class<SRC> sourceClass) { final Class<SRC> sourceClass) {
final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class); // final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class);
final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath( // final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath(
spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); // spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class); // final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class);
final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class); // final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class);
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class) // final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) // .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
.cache(); // .cache();
final Dataset<OpenaireBrokerResult> r0 = readPath( final Dataset<OpenaireBrokerResult> r0 = readPath(
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass) spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
@ -185,7 +185,6 @@ public class GenerateEventsApplication {
final TypedColumn<Tuple2<OpenaireBrokerResult, T>, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator<T>() final TypedColumn<Tuple2<OpenaireBrokerResult, T>, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator<T>()
.toColumn(); .toColumn();
;
return sources return sources
.joinWith(typedRels, sources.col("openaireId").equalTo(rels.col("source")), "left_outer") .joinWith(typedRels, sources.col("openaireId").equalTo(rels.col("source")), "left_outer")

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -13,6 +14,8 @@ import org.dom4j.DocumentHelper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import eu.dnetlib.broker.objects.TypedValue; import eu.dnetlib.broker.objects.TypedValue;
import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.Author;
@ -24,6 +27,7 @@ import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
@ -33,133 +37,186 @@ public class ConversionUtils {
private static final Logger log = LoggerFactory.getLogger(ConversionUtils.class); private static final Logger log = LoggerFactory.getLogger(ConversionUtils.class);
public static List<eu.dnetlib.broker.objects.Instance> oafInstanceToBrokerInstances(final Instance i) { public static List<eu.dnetlib.broker.objects.Instance> oafInstanceToBrokerInstances(final Instance i) {
return i.getUrl().stream().map(url -> { if (i == null) {
return new eu.dnetlib.broker.objects.Instance() return new ArrayList<>();
.setUrl(url) }
.setInstancetype(i.getInstancetype().getClassid())
.setLicense(BrokerConstants.OPEN_ACCESS) return mappedList(i.getUrl(), url -> {
.setHostedby(i.getHostedby().getValue()); final eu.dnetlib.broker.objects.Instance res = new eu.dnetlib.broker.objects.Instance();
}).collect(Collectors.toList()); res.setUrl(url);
res.setInstancetype(classId(i.getInstancetype()));
res.setLicense(BrokerConstants.OPEN_ACCESS);
res.setHostedby(kvValue(i.getHostedby()));
return res;
});
} }
public static TypedValue oafPidToBrokerPid(final StructuredProperty sp) { public static TypedValue oafPidToBrokerPid(final StructuredProperty sp) {
return sp != null ? new TypedValue() return oafStructPropToBrokerTypedValue(sp);
.setValue(sp.getValue()) }
.setType(sp.getQualifier().getClassid()) : null;
public static TypedValue oafStructPropToBrokerTypedValue(final StructuredProperty sp) {
return sp != null ? new TypedValue(classId(sp.getQualifier()), sp.getValue()) : null;
} }
public static final Pair<String, String> oafSubjectToPair(final StructuredProperty sp) { public static final Pair<String, String> oafSubjectToPair(final StructuredProperty sp) {
return sp != null ? Pair.of(sp.getQualifier().getClassid(), sp.getValue()) : null; return sp != null ? Pair.of(classId(sp.getQualifier()), sp.getValue()) : null;
} }
public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) { public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) {
return d != null ? new eu.dnetlib.broker.objects.Dataset() if (d == null) {
.setOriginalId(d.getOriginalId().get(0)) return null;
.setTitle(structPropValue(d.getTitle())) }
.setPids(d.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
.setInstances( final eu.dnetlib.broker.objects.Dataset res = new eu.dnetlib.broker.objects.Dataset();
d res.setOriginalId(first(d.getOriginalId()));
.getInstance() res.setTitle(structPropValue(d.getTitle()));
.stream() res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid));
.map(ConversionUtils::oafInstanceToBrokerInstances) res.setInstances(flatMappedList(d.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
.flatMap(List::stream) res.setCollectedFrom(mappedFirst(d.getCollectedfrom(), KeyValue::getValue));
.collect(Collectors.toList())) return res;
.setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null))
: null;
} }
public static eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication p) { public static eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication p) {
return p != null ? new eu.dnetlib.broker.objects.Publication() if (p == null) {
.setOriginalId(p.getOriginalId().get(0)) return null;
.setTitle(structPropValue(p.getTitle())) }
.setPids(p.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
.setInstances( final eu.dnetlib.broker.objects.Publication res = new eu.dnetlib.broker.objects.Publication();
p res.setOriginalId(first(p.getOriginalId()));
.getInstance() res.setTitle(structPropValue(p.getTitle()));
.stream() res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid));
.map(ConversionUtils::oafInstanceToBrokerInstances) res.setInstances(flatMappedList(p.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
.flatMap(List::stream) res.setCollectedFrom(mappedFirst(p.getCollectedfrom(), KeyValue::getValue));
.collect(Collectors.toList()))
.setCollectedFrom(p.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) return res;
: null;
} }
public static final OpenaireBrokerResult oafResultToBrokerResult(final Result result) { public static final OpenaireBrokerResult oafResultToBrokerResult(final Result result) {
if (result == null) {
return null;
}
return result != null ? new OpenaireBrokerResult() final OpenaireBrokerResult res = new OpenaireBrokerResult();
.setOpenaireId(result.getId())
.setOriginalId(result.getOriginalId().get(0)) res.setOpenaireId(result.getId());
.setTypology(result.getResulttype().getClassid()) res.setOriginalId(first(result.getOriginalId()));
.setTitles(structPropList(result.getTitle())) res.setTypology(classId(result.getResulttype()));
.setAbstracts(fieldList(result.getDescription())) res.setTitles(structPropList(result.getTitle()));
.setLanguage(result.getLanguage().getClassid()) res.setAbstracts(fieldList(result.getDescription()));
.setSubjects(structPropTypedList(result.getSubject())) res.setLanguage(classId(result.getLanguage()));
.setCreators( res.setSubjects(structPropTypedList(result.getSubject()));
result.getAuthor().stream().map(ConversionUtils::oafAuthorToBrokerAuthor).collect(Collectors.toList())) res.setCreators(mappedList(result.getAuthor(), ConversionUtils::oafAuthorToBrokerAuthor));
.setPublicationdate(result.getDateofacceptance().getValue()) res.setPublicationdate(fieldValue(result.getDateofacceptance()));
.setPublisher(fieldValue(result.getPublisher())) res.setPublisher(fieldValue(result.getPublisher()));
.setEmbargoenddate(fieldValue(result.getEmbargoenddate())) res.setEmbargoenddate(fieldValue(result.getEmbargoenddate()));
.setContributor(fieldList(result.getContributor())) res.setContributor(fieldList(result.getContributor()));
res
.setJournal( .setJournal(
result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null) result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null);
.setCollectedFromId(result.getCollectedfrom().stream().map(KeyValue::getKey).findFirst().orElse(null)) res.setCollectedFromId(mappedFirst(result.getCollectedfrom(), KeyValue::getKey));
.setCollectedFromName(result.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) res.setCollectedFromName(mappedFirst(result.getCollectedfrom(), KeyValue::getValue));
.setPids(result.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid));
.setInstances( res.setInstances(flatMappedList(result.getInstance(), ConversionUtils::oafInstanceToBrokerInstances));
result res.setExternalReferences(mappedList(result.getExternalReference(), ConversionUtils::oafExtRefToBrokerExtRef));
.getInstance()
.stream() return res;
.map(ConversionUtils::oafInstanceToBrokerInstances)
.flatMap(List::stream)
.collect(Collectors.toList()))
.setExternalReferences(
result
.getExternalReference()
.stream()
.map(ConversionUtils::oafExtRefToBrokerExtRef)
.collect(Collectors.toList()))
: null;
} }
private static List<TypedValue> structPropTypedList(final List<StructuredProperty> list) { private static List<TypedValue> structPropTypedList(final List<StructuredProperty> list) {
if (list == null) {
return new ArrayList<>();
}
return list return list
.stream() .stream()
.map( .map(ConversionUtils::oafStructPropToBrokerTypedValue)
p -> new TypedValue()
.setValue(p.getValue())
.setType(p.getQualifier().getClassid()))
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
private static <F, T> List<T> mappedList(final List<F> list, final Function<F, T> func) {
if (list == null) {
return new ArrayList<>();
}
return list
.stream()
.map(func::apply)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private static <F, T> List<T> flatMappedList(final List<F> list, final Function<F, List<T>> func) {
if (list == null) {
return new ArrayList<>();
}
return list
.stream()
.map(func::apply)
.flatMap(List::stream)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private static <F, T> T mappedFirst(final List<F> list, final Function<F, T> func) {
if (list == null) {
return null;
}
return list
.stream()
.map(func::apply)
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}
private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) { private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) {
return author != null ? new eu.dnetlib.broker.objects.Author() if (author == null) {
.setFullname(author.getFullname()) return null;
.setOrcid( }
author
.getPid() final String pids = author.getPid() != null ? author
.stream() .getPid()
.filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) .stream()
.map(pid -> pid.getValue()) .filter(pid -> pid != null)
.findFirst() .filter(pid -> pid.getQualifier() != null)
.orElse(null)) .filter(pid -> pid.getQualifier().getClassid() != null)
: null; .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid"))
.map(pid -> pid.getValue())
.filter(StringUtils::isNotBlank)
.findFirst()
.orElse(null) : null;
return new eu.dnetlib.broker.objects.Author(author.getFullname(), pids);
} }
private static eu.dnetlib.broker.objects.Journal oafJournalToBrokerJournal(final Journal journal) { private static eu.dnetlib.broker.objects.Journal oafJournalToBrokerJournal(final Journal journal) {
return journal != null ? new eu.dnetlib.broker.objects.Journal() if (journal == null) {
.setName(journal.getName()) return null;
.setIssn(journal.getIssnPrinted()) }
.setEissn(journal.getIssnOnline())
.setLissn(journal.getIssnLinking()) : null; final eu.dnetlib.broker.objects.Journal res = new eu.dnetlib.broker.objects.Journal();
res.setName(journal.getName());
res.setIssn(journal.getIssnPrinted());
res.setEissn(journal.getIssnOnline());
res.setLissn(journal.getIssnLinking());
return res;
} }
private static eu.dnetlib.broker.objects.ExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) { private static eu.dnetlib.broker.objects.ExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) {
return ref != null ? new eu.dnetlib.broker.objects.ExternalReference() if (ref == null) {
.setRefidentifier(ref.getRefidentifier()) return null;
.setSitename(ref.getSitename()) }
.setType(ref.getQualifier().getClassid())
.setUrl(ref.getUrl()) final eu.dnetlib.broker.objects.ExternalReference res = new eu.dnetlib.broker.objects.ExternalReference();
: null; res.setRefidentifier(ref.getRefidentifier());
res.setSitename(ref.getSitename());
res.setType(classId(ref.getQualifier()));
res.setUrl(ref.getUrl());
return res;
} }
public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) { public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) {
@ -167,10 +224,10 @@ public class ConversionUtils {
return null; return null;
} }
final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project() final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project();
.setTitle(fieldValue(p.getTitle())) res.setTitle(fieldValue(p.getTitle()));
.setAcronym(fieldValue(p.getAcronym())) res.setAcronym(fieldValue(p.getAcronym()));
.setCode(fieldValue(p.getCode())); res.setCode(fieldValue(p.getCode()));
final String ftree = fieldValue(p.getFundingtree()); final String ftree = fieldValue(p.getFundingtree());
if (StringUtils.isNotBlank(ftree)) { if (StringUtils.isNotBlank(ftree)) {
@ -188,12 +245,25 @@ public class ConversionUtils {
} }
public static final eu.dnetlib.broker.objects.Software oafSoftwareToBrokerSoftware(final Software sw) { public static final eu.dnetlib.broker.objects.Software oafSoftwareToBrokerSoftware(final Software sw) {
return sw != null ? new eu.dnetlib.broker.objects.Software() if (sw == null) {
.setName(structPropValue(sw.getTitle())) return null;
.setDescription(fieldValue(sw.getDescription())) }
.setRepository(fieldValue(sw.getCodeRepositoryUrl()))
.setLandingPage(fieldValue(sw.getDocumentationUrl())) final eu.dnetlib.broker.objects.Software res = new eu.dnetlib.broker.objects.Software();
: null; res.setName(structPropValue(sw.getTitle()));
res.setDescription(fieldValue(sw.getDescription()));
res.setRepository(fieldValue(sw.getCodeRepositoryUrl()));
res.setLandingPage(fieldValue(sw.getDocumentationUrl()));
return res;
}
private static String first(final List<String> list) {
return list != null && list.size() > 0 ? list.get(0) : null;
}
private static String kvValue(final KeyValue kv) {
return kv != null ? kv.getValue() : null;
} }
private static String fieldValue(final Field<String> f) { private static String fieldValue(final Field<String> f) {
@ -205,6 +275,10 @@ public class ConversionUtils {
: null; : null;
} }
private static String classId(final Qualifier q) {
return q != null ? q.getClassid() : null;
}
private static String structPropValue(final List<StructuredProperty> props) { private static String structPropValue(final List<StructuredProperty> props) {
return props != null return props != null
? props.stream().map(StructuredProperty::getValue).filter(StringUtils::isNotBlank).findFirst().orElse(null) ? props.stream().map(StructuredProperty::getValue).filter(StringUtils::isNotBlank).findFirst().orElse(null)

View File

@ -122,13 +122,15 @@ public final class UpdateInfo<T> {
.orElse(null); .orElse(null);
; ;
final Provenance provenance = new Provenance().setId(provId).setRepositoryName(provRepo).setUrl(provUrl); final Provenance provenance = new Provenance(provId, provRepo, provUrl);
return new OpenAireEventPayload() final OpenAireEventPayload res = new OpenAireEventPayload();
.setPublication(target) res.setResult(target);
.setHighlight(hl) res.setHighlight(hl);
.setTrust(trust) res.setTrust(trust);
.setProvenance(provenance); res.setProvenance(provenance);
return res;
} }
} }