From a41e0cb64899dc3673dd119e93a2550b4b8c70ec Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 11 Jun 2020 12:28:34 +0200 Subject: [PATCH 1/5] missing landingPage urls in instances --- .../eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java index e6a744fc0..7ff483aff 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java @@ -138,9 +138,16 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper { for (final Object o : doc.selectNodes("//datacite:alternateIdentifier[@alternateIdentifierType='URL']")) { url.add(((Node) o).getText().trim()); } + for (final Object o : doc + .selectNodes("//datacite:alternateIdentifier[@alternateIdentifierType='landingPage']")) { + url.add(((Node) o).getText().trim()); + } for (final Object o : doc.selectNodes("//datacite:identifier[@identifierType='URL']")) { url.add(((Node) o).getText().trim()); } + for (final Object o : doc.selectNodes("//datacite:identifier[@identifierType='landingPage']")) { + url.add(((Node) o).getText().trim()); + } for (final Object o : doc.selectNodes("//datacite:alternateIdentifier[@alternateIdentifierType='DOI']")) { url.add(HTTP_DX_DOI_PREIFX + ((Node) o).getText().trim()); } @@ -379,11 +386,13 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper { res .addAll( prepareListStructPropsWithValidQualifier( - doc, "//datacite:identifier[@identifierType != 'URL']", "@identifierType", DNET_PID_TYPES, info)); + doc, "//datacite:identifier[@identifierType != 'URL' and @identifierType != 'landingPage']", + "@identifierType", DNET_PID_TYPES, info)); res .addAll( prepareListStructPropsWithValidQualifier( - doc, "//datacite:alternateIdentifier[@alternateIdentifierType != 'URL']", + doc, + "//datacite:alternateIdentifier[@alternateIdentifierType != 'URL' and @alternateIdentifierType != 'landingPage']", "@alternateIdentifierType", DNET_PID_TYPES, info)); return res; } From 48959e9a1710f627a902dc986e07e180193edb9e Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 11 Jun 2020 14:24:02 +0200 Subject: [PATCH 2/5] orcid events --- .../simple/EnrichMissingAuthorOrcid.java | 49 ++++++++++++++----- 1 file changed, 38 insertions(+), 11 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java index 1226aaf45..14021480d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java @@ -1,41 +1,68 @@ package eu.dnetlib.dhp.broker.oa.matchers.simple; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; - -import org.apache.commons.lang3.tuple.Pair; +import java.util.Set; +import java.util.stream.Collectors; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; +import eu.dnetlib.dhp.schema.oaf.Author; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.pace.config.DedupConfig; -public class EnrichMissingAuthorOrcid extends UpdateMatcher> { +public class EnrichMissingAuthorOrcid extends UpdateMatcher { public EnrichMissingAuthorOrcid() { super(true); } @Override - protected List>> findUpdates(final ResultWithRelations source, + protected List> findUpdates(final ResultWithRelations source, final ResultWithRelations target, final DedupConfig dedupConfig) { - // TODO - // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f)); - return Arrays.asList(); + + final Set existingOrcids = target + .getResult() + .getAuthor() + .stream() + .map(Author::getPid) + .flatMap(List::stream) + .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) + .map(pid -> pid.getValue()) + .collect(Collectors.toSet()); + + final List> list = new ArrayList<>(); + + for (final Author author : source.getResult().getAuthor()) { + final String name = author.getFullname(); + + for (final StructuredProperty pid : author.getPid()) { + if (pid.getQualifier().getClassid().equalsIgnoreCase("orcid") + && !existingOrcids.contains(pid.getValue())) { + list + .add( + generateUpdateInfo(name + " [ORCID: " + pid.getValue() + "]", source, target, dedupConfig)); + ; + } + } + } + + return list; } - public UpdateInfo> generateUpdateInfo(final Pair highlightValue, + public UpdateInfo generateUpdateInfo(final String highlightValue, final ResultWithRelations source, final ResultWithRelations target, final DedupConfig dedupConfig) { return new UpdateInfo<>( Topic.ENRICH_MISSING_AUTHOR_ORCID, highlightValue, source, target, - (p, pair) -> p.getCreators().add(pair.getLeft() + " - ORCID: " + pair.getRight()), - pair -> pair.getLeft() + "::" + pair.getRight(), + (p, aut) -> p.getCreators().add(aut), + aut -> aut, dedupConfig); } } From c2e1b66e83b6bda2cdf863a1c10e6e02411baea1 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 11 Jun 2020 14:28:03 +0200 Subject: [PATCH 3/5] Revert "orcid events" This reverts commit 48959e9a1710f627a902dc986e07e180193edb9e. --- .../simple/EnrichMissingAuthorOrcid.java | 49 +++++-------------- 1 file changed, 11 insertions(+), 38 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java index 14021480d..1226aaf45 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/simple/EnrichMissingAuthorOrcid.java @@ -1,68 +1,41 @@ package eu.dnetlib.dhp.broker.oa.matchers.simple; -import java.util.ArrayList; +import java.util.Arrays; import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; + +import org.apache.commons.lang3.tuple.Pair; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.ResultWithRelations; -import eu.dnetlib.dhp.schema.oaf.Author; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.pace.config.DedupConfig; -public class EnrichMissingAuthorOrcid extends UpdateMatcher { +public class EnrichMissingAuthorOrcid extends UpdateMatcher> { public EnrichMissingAuthorOrcid() { super(true); } @Override - protected List> findUpdates(final ResultWithRelations source, + protected List>> findUpdates(final ResultWithRelations source, final ResultWithRelations target, final DedupConfig dedupConfig) { - - final Set existingOrcids = target - .getResult() - .getAuthor() - .stream() - .map(Author::getPid) - .flatMap(List::stream) - .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) - .map(pid -> pid.getValue()) - .collect(Collectors.toSet()); - - final List> list = new ArrayList<>(); - - for (final Author author : source.getResult().getAuthor()) { - final String name = author.getFullname(); - - for (final StructuredProperty pid : author.getPid()) { - if (pid.getQualifier().getClassid().equalsIgnoreCase("orcid") - && !existingOrcids.contains(pid.getValue())) { - list - .add( - generateUpdateInfo(name + " [ORCID: " + pid.getValue() + "]", source, target, dedupConfig)); - ; - } - } - } - - return list; + // TODO + // return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f)); + return Arrays.asList(); } - public UpdateInfo generateUpdateInfo(final String highlightValue, + public UpdateInfo> generateUpdateInfo(final Pair highlightValue, final ResultWithRelations source, final ResultWithRelations target, final DedupConfig dedupConfig) { return new UpdateInfo<>( Topic.ENRICH_MISSING_AUTHOR_ORCID, highlightValue, source, target, - (p, aut) -> p.getCreators().add(aut), - aut -> aut, + (p, pair) -> p.getCreators().add(pair.getLeft() + " - ORCID: " + pair.getRight()), + pair -> pair.getLeft() + "::" + pair.getRight(), dedupConfig); } } From 76ea7607f723788f5d390bc5cd08546fd152091f Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 16 Jun 2020 15:53:13 +0200 Subject: [PATCH 4/5] partial refactoring --- .../broker/oa/GenerateEventsApplication.java | 46 ++++------ .../dhp/broker/oa/util/ConversionUtils.java | 84 ++++++++++--------- .../withRels/RelatedEntityFactory.java | 30 +++---- 3 files changed, 76 insertions(+), 84 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index 940d7f9f3..5efaed784 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -51,9 +51,8 @@ public class GenerateEventsApplication { public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - GenerateEventsApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json"))); + .toString(GenerateEventsApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -118,14 +117,11 @@ public class GenerateEventsApplication { return results .joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner") - .groupByKey( - (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) + .groupByKey((MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) .agg(aggr) .map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) .filter(ResultGroup::isValid) - .map( - (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), - Encoders.kryo(EventGroup.class)) + .map((MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), Encoders.kryo(EventGroup.class)) .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class)); } @@ -134,25 +130,22 @@ public class GenerateEventsApplication { final String graphPath, final Class sourceClass) { final Dataset projects = readPath(spark, graphPath + "/project", Project.class); - final Dataset datasets = readPath( - spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); + final Dataset datasets = readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); final Dataset softwares = readPath(spark, graphPath + "/software", Software.class); final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class); final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) - .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); + .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) + .cache(); - final Dataset r0 = readPath( - spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) - .filter(r -> r.getDataInfo().getDeletedbyinference()) - .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); + final Dataset r0 = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) + .filter(r -> r.getDataInfo().getDeletedbyinference()) + .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class)); final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class)); - final Dataset r4 = join( - r3, rels, relatedEntities(publications, rels, RelatedProject.class)); - ; + final Dataset r4 = join(r3, rels, relatedEntities(publications, rels, RelatedProject.class));; return r4; } @@ -162,9 +155,7 @@ public class GenerateEventsApplication { final Class clazz) { return rels .joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner") - .map( - t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), - Encoders.kryo(clazz)); + .map(t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), Encoders.kryo(clazz)); } private static Dataset join(final Dataset sources, @@ -172,13 +163,11 @@ public class GenerateEventsApplication { final Dataset typedRels) { final TypedColumn, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator() - .toColumn(); - ; + .toColumn();; return sources .joinWith(typedRels, sources.col("result.id").equalTo(rels.col("source")), "left_outer") - .groupByKey( - (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) + .groupByKey((MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) .map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class)); } @@ -197,11 +186,8 @@ public class GenerateEventsApplication { final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); final String conf = isLookUpService - .getResourceProfileByQuery( - String - .format( - "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", - profId)); + .getResourceProfileByQuery(String + .format("for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", profId)); final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class); dedupConfig.getPace().initModel(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index b80848682..3693f4ce4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -57,17 +57,31 @@ public class ConversionUtils { .setOriginalId(d.getOriginalId().get(0)) .setTitle(structPropValue(d.getTitle())) .setPids(d.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) - .setInstances( - d - .getInstance() - .stream() - .map(ConversionUtils::oafInstanceToBrokerInstances) - .flatMap(List::stream) - .collect(Collectors.toList())) + .setInstances(d + .getInstance() + .stream() + .map(ConversionUtils::oafInstanceToBrokerInstances) + .flatMap(List::stream) + .collect(Collectors.toList())) .setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) : null; } + public static eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication p) { + return p != null ? new eu.dnetlib.broker.objects.Publication() + .setOriginalId(p.getOriginalId().get(0)) + .setTitle(structPropValue(p.getTitle())) + .setPids(p.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) + .setInstances(p + .getInstance() + .stream() + .map(ConversionUtils::oafInstanceToBrokerInstances) + .flatMap(List::stream) + .collect(Collectors.toList())) + .setCollectedFrom(p.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) + : null; + } + public static final OpenaireBrokerResult oafResultToBrokerResult(final Result result) { return result != null ? new OpenaireBrokerResult() @@ -78,54 +92,48 @@ public class ConversionUtils { .setAbstracts(fieldList(result.getDescription())) .setLanguage(result.getLanguage().getClassid()) .setSubjects(structPropTypedList(result.getSubject())) - .setCreators( - result.getAuthor().stream().map(ConversionUtils::oafAuthorToBrokerAuthor).collect(Collectors.toList())) + .setCreators(result.getAuthor().stream().map(ConversionUtils::oafAuthorToBrokerAuthor).collect(Collectors.toList())) .setPublicationdate(result.getDateofacceptance().getValue()) .setPublisher(fieldValue(result.getPublisher())) .setEmbargoenddate(fieldValue(result.getEmbargoenddate())) .setContributor(fieldList(result.getContributor())) - .setJournal( - result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null) + .setJournal(result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null) .setCollectedFromId(result.getCollectedfrom().stream().map(KeyValue::getKey).findFirst().orElse(null)) .setCollectedFromName(result.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) .setPids(result.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) - .setInstances( - result - .getInstance() - .stream() - .map(ConversionUtils::oafInstanceToBrokerInstances) - .flatMap(List::stream) - .collect(Collectors.toList())) - .setExternalReferences( - result - .getExternalReference() - .stream() - .map(ConversionUtils::oafExtRefToBrokerExtRef) - .collect(Collectors.toList())) + .setInstances(result + .getInstance() + .stream() + .map(ConversionUtils::oafInstanceToBrokerInstances) + .flatMap(List::stream) + .collect(Collectors.toList())) + .setExternalReferences(result + .getExternalReference() + .stream() + .map(ConversionUtils::oafExtRefToBrokerExtRef) + .collect(Collectors.toList())) : null; } private static List structPropTypedList(final List list) { return list .stream() - .map( - p -> new TypedValue() - .setValue(p.getValue()) - .setType(p.getQualifier().getClassid())) + .map(p -> new TypedValue() + .setValue(p.getValue()) + .setType(p.getQualifier().getClassid())) .collect(Collectors.toList()); } private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) { return author != null ? new eu.dnetlib.broker.objects.Author() .setFullname(author.getFullname()) - .setOrcid( - author - .getPid() - .stream() - .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) - .map(pid -> pid.getValue()) - .findFirst() - .orElse(null)) + .setOrcid(author + .getPid() + .stream() + .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) + .map(pid -> pid.getValue()) + .findFirst() + .orElse(null)) : null; } @@ -147,9 +155,7 @@ public class ConversionUtils { } public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) { - if (p == null) { - return null; - } + if (p == null) { return null; } final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project() .setTitle(fieldValue(p.getTitle())) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java index 08d57a1ea..88dab2ba3 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java @@ -1,10 +1,11 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; -import eu.dnetlib.broker.objects.Dataset; -import eu.dnetlib.broker.objects.Project; -import eu.dnetlib.broker.objects.Publication; -import eu.dnetlib.broker.objects.Software; +import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Software; public class RelatedEntityFactory { @@ -13,18 +14,17 @@ public class RelatedEntityFactory { final String relType, final T target, final Class clazz) { + if (clazz == RelatedProject.class) { - return (RT) new RelatedProject(sourceId, relType, (Project) target); + return (RT) new RelatedProject(sourceId, relType, ConversionUtils.oafProjectToBrokerProject((Project) target)); + } else if (clazz == RelatedSoftware.class) { + return (RT) new RelatedSoftware(sourceId, relType, ConversionUtils.oafSoftwareToBrokerSoftware((Software) target)); + } else if (clazz == RelatedDataset.class) { + return (RT) new RelatedDataset(sourceId, relType, ConversionUtils.oafDatasetToBrokerDataset((Dataset) target)); + } else if (clazz == RelatedPublication.class) { + return (RT) new RelatedPublication(sourceId, relType, ConversionUtils.oafPublicationToBrokerPublication((Publication) target)); + } else { + return null; } - if (clazz == RelatedSoftware.class) { - return (RT) new RelatedSoftware(sourceId, relType, (Software) target); - } - if (clazz == RelatedDataset.class) { - return (RT) new RelatedDataset(sourceId, relType, (Dataset) target); - } - if (clazz == RelatedPublication.class) { - return (RT) new RelatedPublication(sourceId, relType, (Publication) target); - } - return null; } } From 9e2c23e39190647b9f3f832e28d3bb4f4f9e3a11 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 16 Jun 2020 15:55:42 +0200 Subject: [PATCH 5/5] partial refactoring --- .../broker/oa/GenerateEventsApplication.java | 43 ++++++---- .../dhp/broker/oa/util/ConversionUtils.java | 82 +++++++++++-------- .../withRels/RelatedEntityFactory.java | 12 ++- 3 files changed, 83 insertions(+), 54 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index 5efaed784..c1f12f43c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -51,8 +51,9 @@ public class GenerateEventsApplication { public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(GenerateEventsApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json"))); + .toString( + GenerateEventsApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -117,11 +118,14 @@ public class GenerateEventsApplication { return results .joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner") - .groupByKey((MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) + .groupByKey( + (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) .agg(aggr) .map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) .filter(ResultGroup::isValid) - .map((MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), Encoders.kryo(EventGroup.class)) + .map( + (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), + Encoders.kryo(EventGroup.class)) .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class)); } @@ -130,7 +134,8 @@ public class GenerateEventsApplication { final String graphPath, final Class sourceClass) { final Dataset projects = readPath(spark, graphPath + "/project", Project.class); - final Dataset datasets = readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); + final Dataset datasets = readPath( + spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); final Dataset softwares = readPath(spark, graphPath + "/software", Software.class); final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class); @@ -138,14 +143,17 @@ public class GenerateEventsApplication { .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) .cache(); - final Dataset r0 = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) - .filter(r -> r.getDataInfo().getDeletedbyinference()) - .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); + final Dataset r0 = readPath( + spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) + .filter(r -> r.getDataInfo().getDeletedbyinference()) + .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class)); final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class)); - final Dataset r4 = join(r3, rels, relatedEntities(publications, rels, RelatedProject.class));; + final Dataset r4 = join( + r3, rels, relatedEntities(publications, rels, RelatedProject.class)); + ; return r4; } @@ -155,7 +163,9 @@ public class GenerateEventsApplication { final Class clazz) { return rels .joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner") - .map(t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), Encoders.kryo(clazz)); + .map( + t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), + Encoders.kryo(clazz)); } private static Dataset join(final Dataset sources, @@ -163,11 +173,13 @@ public class GenerateEventsApplication { final Dataset typedRels) { final TypedColumn, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator() - .toColumn();; + .toColumn(); + ; return sources .joinWith(typedRels, sources.col("result.id").equalTo(rels.col("source")), "left_outer") - .groupByKey((MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) + .groupByKey( + (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) .map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class)); } @@ -186,8 +198,11 @@ public class GenerateEventsApplication { final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); final String conf = isLookUpService - .getResourceProfileByQuery(String - .format("for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", profId)); + .getResourceProfileByQuery( + String + .format( + "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", + profId)); final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class); dedupConfig.getPace().initModel(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index 3693f4ce4..d04ef45a0 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -57,12 +57,13 @@ public class ConversionUtils { .setOriginalId(d.getOriginalId().get(0)) .setTitle(structPropValue(d.getTitle())) .setPids(d.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) - .setInstances(d - .getInstance() - .stream() - .map(ConversionUtils::oafInstanceToBrokerInstances) - .flatMap(List::stream) - .collect(Collectors.toList())) + .setInstances( + d + .getInstance() + .stream() + .map(ConversionUtils::oafInstanceToBrokerInstances) + .flatMap(List::stream) + .collect(Collectors.toList())) .setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) : null; } @@ -72,12 +73,13 @@ public class ConversionUtils { .setOriginalId(p.getOriginalId().get(0)) .setTitle(structPropValue(p.getTitle())) .setPids(p.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) - .setInstances(p - .getInstance() - .stream() - .map(ConversionUtils::oafInstanceToBrokerInstances) - .flatMap(List::stream) - .collect(Collectors.toList())) + .setInstances( + p + .getInstance() + .stream() + .map(ConversionUtils::oafInstanceToBrokerInstances) + .flatMap(List::stream) + .collect(Collectors.toList())) .setCollectedFrom(p.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) : null; } @@ -92,48 +94,54 @@ public class ConversionUtils { .setAbstracts(fieldList(result.getDescription())) .setLanguage(result.getLanguage().getClassid()) .setSubjects(structPropTypedList(result.getSubject())) - .setCreators(result.getAuthor().stream().map(ConversionUtils::oafAuthorToBrokerAuthor).collect(Collectors.toList())) + .setCreators( + result.getAuthor().stream().map(ConversionUtils::oafAuthorToBrokerAuthor).collect(Collectors.toList())) .setPublicationdate(result.getDateofacceptance().getValue()) .setPublisher(fieldValue(result.getPublisher())) .setEmbargoenddate(fieldValue(result.getEmbargoenddate())) .setContributor(fieldList(result.getContributor())) - .setJournal(result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null) + .setJournal( + result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null) .setCollectedFromId(result.getCollectedfrom().stream().map(KeyValue::getKey).findFirst().orElse(null)) .setCollectedFromName(result.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) .setPids(result.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) - .setInstances(result - .getInstance() - .stream() - .map(ConversionUtils::oafInstanceToBrokerInstances) - .flatMap(List::stream) - .collect(Collectors.toList())) - .setExternalReferences(result - .getExternalReference() - .stream() - .map(ConversionUtils::oafExtRefToBrokerExtRef) - .collect(Collectors.toList())) + .setInstances( + result + .getInstance() + .stream() + .map(ConversionUtils::oafInstanceToBrokerInstances) + .flatMap(List::stream) + .collect(Collectors.toList())) + .setExternalReferences( + result + .getExternalReference() + .stream() + .map(ConversionUtils::oafExtRefToBrokerExtRef) + .collect(Collectors.toList())) : null; } private static List structPropTypedList(final List list) { return list .stream() - .map(p -> new TypedValue() - .setValue(p.getValue()) - .setType(p.getQualifier().getClassid())) + .map( + p -> new TypedValue() + .setValue(p.getValue()) + .setType(p.getQualifier().getClassid())) .collect(Collectors.toList()); } private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) { return author != null ? new eu.dnetlib.broker.objects.Author() .setFullname(author.getFullname()) - .setOrcid(author - .getPid() - .stream() - .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) - .map(pid -> pid.getValue()) - .findFirst() - .orElse(null)) + .setOrcid( + author + .getPid() + .stream() + .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) + .map(pid -> pid.getValue()) + .findFirst() + .orElse(null)) : null; } @@ -155,7 +163,9 @@ public class ConversionUtils { } public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) { - if (p == null) { return null; } + if (p == null) { + return null; + } final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project() .setTitle(fieldValue(p.getTitle())) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java index 88dab2ba3..c60d4f141 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java @@ -16,13 +16,17 @@ public class RelatedEntityFactory { final Class clazz) { if (clazz == RelatedProject.class) { - return (RT) new RelatedProject(sourceId, relType, ConversionUtils.oafProjectToBrokerProject((Project) target)); + return (RT) new RelatedProject(sourceId, relType, + ConversionUtils.oafProjectToBrokerProject((Project) target)); } else if (clazz == RelatedSoftware.class) { - return (RT) new RelatedSoftware(sourceId, relType, ConversionUtils.oafSoftwareToBrokerSoftware((Software) target)); + return (RT) new RelatedSoftware(sourceId, relType, + ConversionUtils.oafSoftwareToBrokerSoftware((Software) target)); } else if (clazz == RelatedDataset.class) { - return (RT) new RelatedDataset(sourceId, relType, ConversionUtils.oafDatasetToBrokerDataset((Dataset) target)); + return (RT) new RelatedDataset(sourceId, relType, + ConversionUtils.oafDatasetToBrokerDataset((Dataset) target)); } else if (clazz == RelatedPublication.class) { - return (RT) new RelatedPublication(sourceId, relType, ConversionUtils.oafPublicationToBrokerPublication((Publication) target)); + return (RT) new RelatedPublication(sourceId, relType, + ConversionUtils.oafPublicationToBrokerPublication((Publication) target)); } else { return null; }