From e659b02e6bca84954cdda331f19c9ed1c9503c7a Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 18 Jun 2020 13:15:13 +0200 Subject: [PATCH] some wf fixing --- .../broker/oa/GenerateEventsApplication.java | 46 +++++++++++++------ 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java index 980b01fe1..ae72e83ce 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsApplication.java @@ -50,8 +50,9 @@ public class GenerateEventsApplication { public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(GenerateEventsApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json"))); + .toString( + GenerateEventsApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -119,11 +120,14 @@ public class GenerateEventsApplication { return results .joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner") - .groupByKey((MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) + .groupByKey( + (MapFunction, String>) t -> t._2.getTarget(), Encoders.STRING()) .agg(aggr) .map((MapFunction, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) .filter(ResultGroup::isValid) - .map((MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), Encoders.kryo(EventGroup.class)) + .map( + (MapFunction) g -> EventFinder.generateEvents(g, dedupConfig), + Encoders.kryo(EventGroup.class)) .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class)); } @@ -133,7 +137,8 @@ public class GenerateEventsApplication { final Class sourceClass) { final Dataset projects = readPath(spark, graphPath + "/project", Project.class); - final Dataset datasets = readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); + final Dataset datasets = readPath( + spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); final Dataset softwares = readPath(spark, graphPath + "/software", Software.class); final Dataset publications = readPath(spark, graphPath + "/publication", Publication.class); @@ -141,14 +146,17 @@ public class GenerateEventsApplication { .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) .cache(); - final Dataset r0 = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) - .filter(r -> r.getDataInfo().getDeletedbyinference()) - .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); + final Dataset r0 = readPath( + spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) + .filter(r -> r.getDataInfo().getDeletedbyinference()) + .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class)); final Dataset r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); final Dataset r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class)); final Dataset r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class)); - final Dataset r4 = join(r3, rels, relatedEntities(publications, rels, RelatedProject.class));; + final Dataset r4 = join( + r3, rels, relatedEntities(publications, rels, RelatedProject.class)); + ; return r4; } @@ -158,7 +166,9 @@ public class GenerateEventsApplication { final Class clazz) { return rels .joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner") - .map(t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), Encoders.kryo(clazz)); + .map( + t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), + Encoders.kryo(clazz)); } private static Dataset join(final Dataset sources, @@ -166,10 +176,13 @@ public class GenerateEventsApplication { final Dataset typedRels) { final TypedColumn, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator() - .toColumn();; + .toColumn(); + ; - return sources.joinWith(typedRels, sources.col("openaireId").equalTo(rels.col("source")), "left_outer") - .groupByKey((MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) + return sources + .joinWith(typedRels, sources.col("openaireId").equalTo(rels.col("source")), "left_outer") + .groupByKey( + (MapFunction, String>) t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) .map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class)); @@ -190,8 +203,11 @@ public class GenerateEventsApplication { final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); final String conf = isLookUpService - .getResourceProfileByQuery(String - .format("for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", profId)); + .getResourceProfileByQuery( + String + .format( + "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", + profId)); final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class); dedupConfig.getPace().initModel();