From 9e2c23e39190647b9f3f832e28d3bb4f4f9e3a11 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 16 Jun 2020 15:55:42 +0200 Subject: [PATCH] partial refactoring --- .../broker/oa/GenerateEventsApplication.java | 43 ++++++---- .../dhp/broker/oa/util/ConversionUtils.java | 82 +++++++++++-------- .../withRels/RelatedEntityFactory.java | 12 ++- 3 files changed, 83 insertions(+), 54 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index 5efaed784..c1f12f43c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -51,8 +51,9 @@ public class GenerateEventsApplication { public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(GenerateEventsApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json"))); + .toString( + GenerateEventsApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -117,11 +118,14 @@ public class GenerateEventsApplication { return results .joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner") - .groupByKey((MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) + .groupByKey( + (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) .agg(aggr) .map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) .filter(ResultGroup::isValid) - .map((MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), Encoders.kryo(EventGroup.class)) + .map( + (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), + Encoders.kryo(EventGroup.class)) .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class)); } @@ -130,7 +134,8 @@ public class GenerateEventsApplication { final String graphPath, final Class sourceClass) { final Dataset projects = readPath(spark, graphPath + "/project", Project.class); - final Dataset datasets = readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); + final Dataset datasets = readPath( + spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); final Dataset softwares = readPath(spark, graphPath + "/software", Software.class); final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class); @@ -138,14 +143,17 @@ public class GenerateEventsApplication { .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) .cache(); - final Dataset r0 = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) - .filter(r -> r.getDataInfo().getDeletedbyinference()) - .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); + final Dataset r0 = readPath( + spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) + .filter(r -> r.getDataInfo().getDeletedbyinference()) + .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class)); final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class)); - final Dataset r4 = join(r3, rels, relatedEntities(publications, rels, RelatedProject.class));; + final Dataset r4 = join( + r3, rels, relatedEntities(publications, rels, RelatedProject.class)); + ; return r4; } @@ -155,7 +163,9 @@ public class GenerateEventsApplication { final Class clazz) { return rels .joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner") - .map(t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), Encoders.kryo(clazz)); + .map( + t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), + Encoders.kryo(clazz)); } private static Dataset join(final Dataset sources, @@ -163,11 +173,13 @@ public class GenerateEventsApplication { final Dataset typedRels) { final TypedColumn, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator() - .toColumn();; + .toColumn(); + ; return sources .joinWith(typedRels, sources.col("result.id").equalTo(rels.col("source")), "left_outer") - .groupByKey((MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) + .groupByKey( + (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) .map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class)); } @@ -186,8 +198,11 @@ public class GenerateEventsApplication { final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); final String conf = isLookUpService - .getResourceProfileByQuery(String - .format("for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", profId)); + .getResourceProfileByQuery( + String + .format( + "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", + profId)); final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class); dedupConfig.getPace().initModel(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index 3693f4ce4..d04ef45a0 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -57,12 +57,13 @@ public class ConversionUtils { .setOriginalId(d.getOriginalId().get(0)) .setTitle(structPropValue(d.getTitle())) .setPids(d.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) - .setInstances(d - .getInstance() - .stream() - .map(ConversionUtils::oafInstanceToBrokerInstances) - .flatMap(List::stream) - .collect(Collectors.toList())) + .setInstances( + d + .getInstance() + .stream() + .map(ConversionUtils::oafInstanceToBrokerInstances) + .flatMap(List::stream) + .collect(Collectors.toList())) .setCollectedFrom(d.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) : null; } @@ -72,12 +73,13 @@ public class ConversionUtils { .setOriginalId(p.getOriginalId().get(0)) .setTitle(structPropValue(p.getTitle())) .setPids(p.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) - .setInstances(p - .getInstance() - .stream() - .map(ConversionUtils::oafInstanceToBrokerInstances) - .flatMap(List::stream) - .collect(Collectors.toList())) + .setInstances( + p + .getInstance() + .stream() + .map(ConversionUtils::oafInstanceToBrokerInstances) + .flatMap(List::stream) + .collect(Collectors.toList())) .setCollectedFrom(p.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) : null; } @@ -92,48 +94,54 @@ public class ConversionUtils { .setAbstracts(fieldList(result.getDescription())) .setLanguage(result.getLanguage().getClassid()) .setSubjects(structPropTypedList(result.getSubject())) - .setCreators(result.getAuthor().stream().map(ConversionUtils::oafAuthorToBrokerAuthor).collect(Collectors.toList())) + .setCreators( + result.getAuthor().stream().map(ConversionUtils::oafAuthorToBrokerAuthor).collect(Collectors.toList())) .setPublicationdate(result.getDateofacceptance().getValue()) .setPublisher(fieldValue(result.getPublisher())) .setEmbargoenddate(fieldValue(result.getEmbargoenddate())) .setContributor(fieldList(result.getContributor())) - .setJournal(result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null) + .setJournal( + result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null) .setCollectedFromId(result.getCollectedfrom().stream().map(KeyValue::getKey).findFirst().orElse(null)) .setCollectedFromName(result.getCollectedfrom().stream().map(KeyValue::getValue).findFirst().orElse(null)) .setPids(result.getPid().stream().map(ConversionUtils::oafPidToBrokerPid).collect(Collectors.toList())) - .setInstances(result - .getInstance() - .stream() - .map(ConversionUtils::oafInstanceToBrokerInstances) - .flatMap(List::stream) - .collect(Collectors.toList())) - .setExternalReferences(result - .getExternalReference() - .stream() - .map(ConversionUtils::oafExtRefToBrokerExtRef) - .collect(Collectors.toList())) + .setInstances( + result + .getInstance() + .stream() + .map(ConversionUtils::oafInstanceToBrokerInstances) + .flatMap(List::stream) + .collect(Collectors.toList())) + .setExternalReferences( + result + .getExternalReference() + .stream() + .map(ConversionUtils::oafExtRefToBrokerExtRef) + .collect(Collectors.toList())) : null; } private static List structPropTypedList(final List list) { return list .stream() - .map(p -> new TypedValue() - .setValue(p.getValue()) - .setType(p.getQualifier().getClassid())) + .map( + p -> new TypedValue() + .setValue(p.getValue()) + .setType(p.getQualifier().getClassid())) .collect(Collectors.toList()); } private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) { return author != null ? new eu.dnetlib.broker.objects.Author() .setFullname(author.getFullname()) - .setOrcid(author - .getPid() - .stream() - .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) - .map(pid -> pid.getValue()) - .findFirst() - .orElse(null)) + .setOrcid( + author + .getPid() + .stream() + .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) + .map(pid -> pid.getValue()) + .findFirst() + .orElse(null)) : null; } @@ -155,7 +163,9 @@ public class ConversionUtils { } public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) { - if (p == null) { return null; } + if (p == null) { + return null; + } final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project() .setTitle(fieldValue(p.getTitle())) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java index 88dab2ba3..c60d4f141 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/withRels/RelatedEntityFactory.java @@ -16,13 +16,17 @@ public class RelatedEntityFactory { final Class clazz) { if (clazz == RelatedProject.class) { - return (RT) new RelatedProject(sourceId, relType, ConversionUtils.oafProjectToBrokerProject((Project) target)); + return (RT) new RelatedProject(sourceId, relType, + ConversionUtils.oafProjectToBrokerProject((Project) target)); } else if (clazz == RelatedSoftware.class) { - return (RT) new RelatedSoftware(sourceId, relType, ConversionUtils.oafSoftwareToBrokerSoftware((Software) target)); + return (RT) new RelatedSoftware(sourceId, relType, + ConversionUtils.oafSoftwareToBrokerSoftware((Software) target)); } else if (clazz == RelatedDataset.class) { - return (RT) new RelatedDataset(sourceId, relType, ConversionUtils.oafDatasetToBrokerDataset((Dataset) target)); + return (RT) new RelatedDataset(sourceId, relType, + ConversionUtils.oafDatasetToBrokerDataset((Dataset) target)); } else if (clazz == RelatedPublication.class) { - return (RT) new RelatedPublication(sourceId, relType, ConversionUtils.oafPublicationToBrokerPublication((Publication) target)); + return (RT) new RelatedPublication(sourceId, relType, + ConversionUtils.oafPublicationToBrokerPublication((Publication) target)); } else { return null; }