From 76ea7607f723788f5d390bc5cd08546fd152091f Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 16 Jun 2020 15:53:13 +0200 Subject: [PATCH] partial refactoring --- .../broker/oa/GenerateEventsApplication.java | 46 ++++------ .../dhp/broker/oa/util/ConversionUtils.java | 84 ++++++++++--------- .../withRels/RelatedEntityFactory.java | 30 +++---- 3 files changed, 76 insertions(+), 84 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index 940d7f9f32..5efaed784a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -51,9 +51,8 @@ public class GenerateEventsApplication { public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - GenerateEventsApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json"))); + .toString(GenerateEventsApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -118,14 +117,11 @@ public class GenerateEventsApplication { return results .joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner") - .groupByKey( - (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) + .groupByKey((MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) .agg(aggr) .map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) .filter(ResultGroup::isValid) - .map( - (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), - Encoders.kryo(EventGroup.class)) + .map((MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), Encoders.kryo(EventGroup.class)) .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class)); } @@ -134,25 +130,22 @@ public class GenerateEventsApplication { final String graphPath, final Class sourceClass) { final Dataset projects = readPath(spark, graphPath + "/project", Project.class); - final Dataset datasets = readPath( - spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); + final Dataset datasets = readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); final Dataset softwares = readPath(spark, graphPath + "/software", Software.class); final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class); final Dataset rels = readPath(spark, graphPath + "/relation", Relation.class) - .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); + .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) + .cache(); - final Dataset r0 = readPath( - spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) - .filter(r -> r.getDataInfo().getDeletedbyinference()) - .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); + final Dataset r0 = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) + .filter(r -> r.getDataInfo().getDeletedbyinference()) + .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class)); final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class)); - final Dataset r4 = join( - r3, rels, relatedEntities(publications, rels, RelatedProject.class)); - ; + final Dataset r4 = join(r3, rels, relatedEntities(publications, rels, RelatedProject.class));; return r4; } @@ -162,9 +155,7 @@ public class GenerateEventsApplication { final Class clazz) { return rels .joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner") - .map( - t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), - Encoders.kryo(clazz)); + .map(t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), Encoders.kryo(clazz)); } private static Dataset join(final Dataset sources, @@ -172,13 +163,11 @@ public class GenerateEventsApplication { final Dataset typedRels) { final TypedColumn, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator() - .toColumn(); - ; + .toColumn();; return sources .joinWith(typedRels, sources.col("result.id").equalTo(rels.col("source")), "left_outer") - .groupByKey( - (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) + .groupByKey((MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) .map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class)); } @@ -197,11 +186,8 @@ public class GenerateEventsApplication { final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); final String conf = isLookUpService - .getResourceProfileByQuery( - String - .format( - "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", - profId)); + .getResourceProfileByQuery(String + .format("for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", profId)); final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class); dedupConfig.getPace().initModel(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index b80848682a..3693f4ce43 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -57,17 +57,31 @@ public class ConversionUtils { .setOriginalId(d.getOriginalId().get(0)) .setTitle(structPropValue(d.getTitle())) .setPids(d.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) - .setInstances( - d - .getInstance() - .stream() - .map(ConversionUtils::oafInstanceToBrokerInstances) - .flatMap(List::stream) - .collect(Collectors.toList())) + .setInstances(d + .getInstance() + .stream() + .map(ConversionUtils::oafInstanceToBrokerInstances) + .flatMap(List::stream) + .collect(Collectors.toList())) .setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) : null; } + public static eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication p) { + return p != null ? new eu.dnetlib.broker.objects.Publication() + .setOriginalId(p.getOriginalId().get(0)) + .setTitle(structPropValue(p.getTitle())) + .setPids(p.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) + .setInstances(p + .getInstance() + .stream() + .map(ConversionUtils::oafInstanceToBrokerInstances) + .flatMap(List::stream) + .collect(Collectors.toList())) + .setCollectedFrom(p.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) + : null; + } + public static final OpenaireBrokerResult oafResultToBrokerResult(final Result result) { return result != null ? new OpenaireBrokerResult() @@ -78,54 +92,48 @@ public class ConversionUtils { .setAbstracts(fieldList(result.getDescription())) .setLanguage(result.getLanguage().getClassid()) .setSubjects(structPropTypedList(result.getSubject())) - .setCreators( - result.getAuthor().stream().map(ConversionUtils::oafAuthorToBrokerAuthor).collect(Collectors.toList())) + .setCreators(result.getAuthor().stream().map(ConversionUtils::oafAuthorToBrokerAuthor).collect(Collectors.toList())) .setPublicationdate(result.getDateofacceptance().getValue()) .setPublisher(fieldValue(result.getPublisher())) .setEmbargoenddate(fieldValue(result.getEmbargoenddate())) .setContributor(fieldList(result.getContributor())) - .setJournal( - result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null) + .setJournal(result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null) .setCollectedFromId(result.getCollectedfrom().stream().map(KeyValue::getKey).findFirst().orElse(null)) .setCollectedFromName(result.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) .setPids(result.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) - .setInstances( - result - .getInstance() - .stream() - .map(ConversionUtils::oafInstanceToBrokerInstances) - .flatMap(List::stream) - .collect(Collectors.toList())) - .setExternalReferences( - result - .getExternalReference() - .stream() - .map(ConversionUtils::oafExtRefToBrokerExtRef) - .collect(Collectors.toList())) + .setInstances(result + .getInstance() + .stream() + .map(ConversionUtils::oafInstanceToBrokerInstances) + .flatMap(List::stream) + .collect(Collectors.toList())) + .setExternalReferences(result + .getExternalReference() + .stream() + .map(ConversionUtils::oafExtRefToBrokerExtRef) + .collect(Collectors.toList())) : null; } private static List structPropTypedList(final List list) { return list .stream() - .map( - p -> new TypedValue() - .setValue(p.getValue()) - .setType(p.getQualifier().getClassid())) + .map(p -> new TypedValue() + .setValue(p.getValue()) + .setType(p.getQualifier().getClassid())) .collect(Collectors.toList()); } private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) { return author != null ? new eu.dnetlib.broker.objects.Author() .setFullname(author.getFullname()) - .setOrcid( - author - .getPid() - .stream() - .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) - .map(pid -> pid.getValue()) - .findFirst() - .orElse(null)) + .setOrcid(author + .getPid() + .stream() + .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) + .map(pid -> pid.getValue()) + .findFirst() + .orElse(null)) : null; } @@ -147,9 +155,7 @@ public class ConversionUtils { } public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) { - if (p == null) { - return null; - } + if (p == null) { return null; } final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project() .setTitle(fieldValue(p.getTitle())) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java index 08d57a1ea7..88dab2ba35 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java @@ -1,10 +1,11 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels; -import eu.dnetlib.broker.objects.Dataset; -import eu.dnetlib.broker.objects.Project; -import eu.dnetlib.broker.objects.Publication; -import eu.dnetlib.broker.objects.Software; +import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Software; public class RelatedEntityFactory { @@ -13,18 +14,17 @@ public class RelatedEntityFactory { final String relType, final T target, final Class clazz) { + if (clazz == RelatedProject.class) { - return (RT) new RelatedProject(sourceId, relType, (Project) target); + return (RT) new RelatedProject(sourceId, relType, ConversionUtils.oafProjectToBrokerProject((Project) target)); + } else if (clazz == RelatedSoftware.class) { + return (RT) new RelatedSoftware(sourceId, relType, ConversionUtils.oafSoftwareToBrokerSoftware((Software) target)); + } else if (clazz == RelatedDataset.class) { + return (RT) new RelatedDataset(sourceId, relType, ConversionUtils.oafDatasetToBrokerDataset((Dataset) target)); + } else if (clazz == RelatedPublication.class) { + return (RT) new RelatedPublication(sourceId, relType, ConversionUtils.oafPublicationToBrokerPublication((Publication) target)); + } else { + return null; } - if (clazz == RelatedSoftware.class) { - return (RT) new RelatedSoftware(sourceId, relType, (Software) target); - } - if (clazz == RelatedDataset.class) { - return (RT) new RelatedDataset(sourceId, relType, (Dataset) target); - } - if (clazz == RelatedPublication.class) { - return (RT) new RelatedPublication(sourceId, relType, (Publication) target); - } - return null; } }