enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
10 changed files with 102 additions and 49 deletions
Showing only changes of commit e34e7d6728 - Show all commits

View File

@ -36,8 +36,9 @@ public class EventFactory {
final Map<String, Object> map = createMapFromResult(updateInfo); final Map<String, Object> map = createMapFromResult(updateInfo);
final String eventId = final String eventId = calculateEventId(
calculateEventId(updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0), updateInfo.getHighlightValueAsString()); updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0),
updateInfo.getHighlightValueAsString());
res.setEventId(eventId); res.setEventId(eventId);
res.setProducerId(PRODUCER_ID); res.setProducerId(PRODUCER_ID);
@ -80,13 +81,17 @@ public class EventFactory {
final List<StructuredProperty> subjects = target.getSubject(); final List<StructuredProperty> subjects = target.getSubject();
if (subjects.size() > 0) { if (subjects.size() > 0) {
map map
.put("target_publication_subject_list", subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList())); .put(
"target_publication_subject_list",
subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList()));
} }
final List<Author> authors = target.getAuthor(); final List<Author> authors = target.getAuthor();
if (authors.size() > 0) { if (authors.size() > 0) {
map map
.put("target_publication_author_list", authors.stream().map(Author::getFullname).collect(Collectors.toList())); .put(
"target_publication_author_list",
authors.stream().map(Author::getFullname).collect(Collectors.toList()));
} }
// PROVENANCE INFO // PROVENANCE INFO
@ -113,7 +118,9 @@ public class EventFactory {
} }
private static long parseDateTolong(final String date) { private static long parseDateTolong(final String date) {
if (StringUtils.isBlank(date)) { return -1; } if (StringUtils.isBlank(date)) {
return -1;
}
try { try {
return DateUtils.parseDate(date, DATE_PATTERNS).getTime(); return DateUtils.parseDate(date, DATE_PATTERNS).getTime();
} catch (final ParseException e) { } catch (final ParseException e) {

View File

@ -87,31 +87,24 @@ public class GenerateEventsApplication {
private static final UpdateMatcher<Pair<Result, List<Software>>, ?> enrichMoreSoftware = new EnrichMoreSoftware(); private static final UpdateMatcher<Pair<Result, List<Software>>, ?> enrichMoreSoftware = new EnrichMoreSoftware();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMisissingPublicationIsRelatedTo = new EnrichMissingPublicationIsRelatedTo(); private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMisissingPublicationIsRelatedTo = new EnrichMissingPublicationIsRelatedTo();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsReferencedBy = private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsReferencedBy = new EnrichMissingPublicationIsReferencedBy();
new EnrichMissingPublicationIsReferencedBy();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationReferences = new EnrichMissingPublicationReferences(); private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationReferences = new EnrichMissingPublicationReferences();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsSupplementedTo = private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsSupplementedTo = new EnrichMissingPublicationIsSupplementedTo();
new EnrichMissingPublicationIsSupplementedTo(); private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsSupplementedBy = new EnrichMissingPublicationIsSupplementedBy();
private static final UpdateMatcher<Pair<Result, List<Publication>>, ?> enrichMissingPublicationIsSupplementedBy =
new EnrichMissingPublicationIsSupplementedBy();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMisissingDatasetIsRelatedTo = private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMisissingDatasetIsRelatedTo = new EnrichMissingDatasetIsRelatedTo();
new EnrichMissingDatasetIsRelatedTo(); private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsReferencedBy = new EnrichMissingDatasetIsReferencedBy();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsReferencedBy = private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetReferences = new EnrichMissingDatasetReferences();
new EnrichMissingDatasetIsReferencedBy(); private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsSupplementedTo = new EnrichMissingDatasetIsSupplementedTo();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetReferences = private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsSupplementedBy = new EnrichMissingDatasetIsSupplementedBy();
new EnrichMissingDatasetReferences();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsSupplementedTo =
new EnrichMissingDatasetIsSupplementedTo();
private static final UpdateMatcher<Pair<Result, List<eu.dnetlib.dhp.schema.oaf.Dataset>>, ?> enrichMissingDatasetIsSupplementedBy =
new EnrichMissingDatasetIsSupplementedBy();
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString(GenerateEventsApplication.class .toString(
GenerateEventsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json"))); .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json")));
parser.parseArgument(args); parser.parseArgument(args);
@ -156,7 +149,8 @@ public class GenerateEventsApplication {
final String graphPath, final String graphPath,
final Class<R> resultClazz) { final Class<R> resultClazz) {
final Dataset<R> results = readPath(spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), resultClazz) final Dataset<R> results = readPath(
spark, graphPath + "/" + resultClazz.getSimpleName().toLowerCase(), resultClazz)
.filter(r -> r.getDataInfo().getDeletedbyinference()); .filter(r -> r.getDataInfo().getDeletedbyinference());
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class) final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
@ -196,15 +190,18 @@ public class GenerateEventsApplication {
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList()); return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
} }
private static <SRC extends Result, TRG extends OafEntity> JavaRDD<Event> generateRelationEvents(final SparkSession spark, private static <SRC extends Result, TRG extends OafEntity> JavaRDD<Event> generateRelationEvents(
final SparkSession spark,
final String graphPath, final String graphPath,
final Class<SRC> sourceClass, final Class<SRC> sourceClass,
final Class<TRG> targetClass) { final Class<TRG> targetClass) {
final Dataset<SRC> sources = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass) final Dataset<SRC> sources = readPath(
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
.filter(r -> r.getDataInfo().getDeletedbyinference()); .filter(r -> r.getDataInfo().getDeletedbyinference());
final Dataset<TRG> targets = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), targetClass); final Dataset<TRG> targets = readPath(
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), targetClass);
final Dataset<Relation> mergedRels = readPath(spark, graphPath + "/relation", Relation.class) final Dataset<Relation> mergedRels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));

View File

@ -28,7 +28,8 @@ public class EnrichMissingProject
if (source.getRight().isEmpty()) { if (source.getRight().isEmpty()) {
return Arrays.asList(); return Arrays.asList();
} else { } else {
return target.getRight() return target
.getRight()
.stream() .stream()
.map(ConversionUtils::oafProjectToBrokerProject) .map(ConversionUtils::oafProjectToBrokerProject)
.map(p -> generateUpdateInfo(p, source, target)) .map(p -> generateUpdateInfo(p, source, target))

View File

@ -24,12 +24,14 @@ public class EnrichMoreProject extends UpdateMatcher<Pair<Result, List<Project>>
protected List<UpdateInfo<eu.dnetlib.broker.objects.Project>> findUpdates(final Pair<Result, List<Project>> source, protected List<UpdateInfo<eu.dnetlib.broker.objects.Project>> findUpdates(final Pair<Result, List<Project>> source,
final Pair<Result, List<Project>> target) { final Pair<Result, List<Project>> target) {
final Set<String> existingProjects = source.getRight() final Set<String> existingProjects = source
.getRight()
.stream() .stream()
.map(Project::getId) .map(Project::getId)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
return target.getRight() return target
.getRight()
.stream() .stream()
.filter(p -> !existingProjects.contains(p.getId())) .filter(p -> !existingProjects.contains(p.getId()))
.map(ConversionUtils::oafProjectToBrokerProject) .map(ConversionUtils::oafProjectToBrokerProject)

View File

@ -29,7 +29,8 @@ public class EnrichMissingSoftware
if (source.getRight().isEmpty()) { if (source.getRight().isEmpty()) {
return Arrays.asList(); return Arrays.asList();
} else { } else {
return target.getRight() return target
.getRight()
.stream() .stream()
.map(ConversionUtils::oafSoftwareToBrokerSoftware) .map(ConversionUtils::oafSoftwareToBrokerSoftware)
.map(p -> generateUpdateInfo(p, source, target)) .map(p -> generateUpdateInfo(p, source, target))

View File

@ -26,12 +26,14 @@ public class EnrichMoreSoftware
final Pair<Result, List<Software>> source, final Pair<Result, List<Software>> source,
final Pair<Result, List<Software>> target) { final Pair<Result, List<Software>> target) {
final Set<String> existingSoftwares = source.getRight() final Set<String> existingSoftwares = source
.getRight()
.stream() .stream()
.map(Software::getId) .map(Software::getId)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
return target.getRight() return target
.getRight()
.stream() .stream()
.filter(p -> !existingSoftwares.contains(p.getId())) .filter(p -> !existingSoftwares.contains(p.getId()))
.map(ConversionUtils::oafSoftwareToBrokerSoftware) .map(ConversionUtils::oafSoftwareToBrokerSoftware)

View File

@ -28,7 +28,9 @@ public class EnrichMissingOpenAccess extends UpdateMatcher<Result, Instance> {
.filter(right -> right.equals(BrokerConstants.OPEN_ACCESS)) .filter(right -> right.equals(BrokerConstants.OPEN_ACCESS))
.count(); .count();
if (count > 0) { return Arrays.asList(); } if (count > 0) {
return Arrays.asList();
}
return source return source
.getInstance() .getInstance()

View File

@ -15,7 +15,7 @@ public class BrokerConstants {
public static final String OPEN_ACCESS = "OPEN"; public static final String OPEN_ACCESS = "OPEN";
public static final String IS_MERGED_IN_CLASS = "isMergedIn"; public static final String IS_MERGED_IN_CLASS = "isMergedIn";
public static final List<Class<? extends Result>> RESULT_CLASSES = public static final List<Class<? extends Result>> RESULT_CLASSES = Arrays
Arrays.asList(Publication.class, Dataset.class, Software.class, OtherResearchProduct.class); .asList(Publication.class, Dataset.class, Software.class, OtherResearchProduct.class);
} }

View File

@ -56,7 +56,13 @@ public class ConversionUtils {
.setOriginalId(d.getOriginalId().get(0)) .setOriginalId(d.getOriginalId().get(0))
.setTitles(structPropList(d.getTitle())) .setTitles(structPropList(d.getTitle()))
.setPids(d.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) .setPids(d.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
.setInstances(d.getInstance().stream().map(ConversionUtils::oafInstanceToBrokerInstances).flatMap(List::stream).collect(Collectors.toList())) .setInstances(
d
.getInstance()
.stream()
.map(ConversionUtils::oafInstanceToBrokerInstances)
.flatMap(List::stream)
.collect(Collectors.toList()))
.setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toList())) .setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toList()))
: null; : null;
} }
@ -74,11 +80,23 @@ public class ConversionUtils {
.setPublisher(fieldValue(result.getPublisher())) .setPublisher(fieldValue(result.getPublisher()))
.setEmbargoenddate(fieldValue(result.getEmbargoenddate())) .setEmbargoenddate(fieldValue(result.getEmbargoenddate()))
.setContributor(fieldList(result.getContributor())) .setContributor(fieldList(result.getContributor()))
.setJournal(result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null) .setJournal(
result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null)
.setCollectedFrom(result.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toList())) .setCollectedFrom(result.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toList()))
.setPids(result.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) .setPids(result.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList()))
.setInstances(result.getInstance().stream().map(ConversionUtils::oafInstanceToBrokerInstances).flatMap(List::stream).collect(Collectors.toList())) .setInstances(
.setExternalReferences(result.getExternalReference().stream().map(ConversionUtils::oafExtRefToBrokerExtRef).collect(Collectors.toList())) result
.getInstance()
.stream()
.map(ConversionUtils::oafInstanceToBrokerInstances)
.flatMap(List::stream)
.collect(Collectors.toList()))
.setExternalReferences(
result
.getExternalReference()
.stream()
.map(ConversionUtils::oafExtRefToBrokerExtRef)
.collect(Collectors.toList()))
: null; : null;
} }
@ -100,7 +118,9 @@ public class ConversionUtils {
} }
public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) { public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) {
if (p == null) { return null; } if (p == null) {
return null;
}
final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project() final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project()
.setTitle(fieldValue(p.getTitle())) .setTitle(fieldValue(p.getTitle()))
@ -136,20 +156,29 @@ public class ConversionUtils {
} }
private static String fieldValue(final List<Field<String>> fl) { private static String fieldValue(final List<Field<String>> fl) {
return fl != null ? fl.stream().map(Field::getValue).filter(StringUtils::isNotBlank).findFirst().orElse(null) : null; return fl != null ? fl.stream().map(Field::getValue).filter(StringUtils::isNotBlank).findFirst().orElse(null)
: null;
} }
private static String structPropValue(final List<StructuredProperty> props) { private static String structPropValue(final List<StructuredProperty> props) {
return props != null ? props.stream().map(StructuredProperty::getValue).filter(StringUtils::isNotBlank).findFirst().orElse(null) : null; return props != null
? props.stream().map(StructuredProperty::getValue).filter(StringUtils::isNotBlank).findFirst().orElse(null)
: null;
} }
private static List<String> fieldList(final List<Field<String>> fl) { private static List<String> fieldList(final List<Field<String>> fl) {
return fl != null ? fl.stream().map(Field::getValue).filter(StringUtils::isNotBlank).collect(Collectors.toList()) return fl != null
? fl.stream().map(Field::getValue).filter(StringUtils::isNotBlank).collect(Collectors.toList())
: new ArrayList<>(); : new ArrayList<>();
} }
private static List<String> structPropList(final List<StructuredProperty> props) { private static List<String> structPropList(final List<StructuredProperty> props) {
return props != null ? props.stream().map(StructuredProperty::getValue).filter(StringUtils::isNotBlank).collect(Collectors.toList()) return props != null
? props
.stream()
.map(StructuredProperty::getValue)
.filter(StringUtils::isNotBlank)
.collect(Collectors.toList())
: new ArrayList<>(); : new ArrayList<>();
} }
} }

View File

@ -83,8 +83,20 @@ public final class UpdateInfo<T> {
compileHighlight.accept(hl, getHighlightValue()); compileHighlight.accept(hl, getHighlightValue());
final String provId = getSource().getOriginalId().stream().findFirst().orElse(null); final String provId = getSource().getOriginalId().stream().findFirst().orElse(null);
final String provRepo = getSource().getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null); final String provRepo = getSource()
final String provUrl = getSource().getInstance().stream().map(Instance::getUrl).flatMap(List::stream).findFirst().orElse(null);; .getCollectedfrom()
.stream()
.map(KeyValue::getValue)
.findFirst()
.orElse(null);
final String provUrl = getSource()
.getInstance()
.stream()
.map(Instance::getUrl)
.flatMap(List::stream)
.findFirst()
.orElse(null);
;
final Provenance provenance = new Provenance().setId(provId).setRepositoryName(provRepo).setUrl(provUrl); final Provenance provenance = new Provenance().setId(provId).setRepositoryName(provRepo).setUrl(provUrl);