From fba11eef2ab857ab16810c4f522be2d7e9b06127 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 7 Dec 2020 21:53:13 +0100 Subject: [PATCH] cleanup --- .../broker/oa/PartitionEventsByDsIdJob.java | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java index 65d5e6f946..e9644122f3 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java @@ -18,6 +18,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.slf4j.Logger; @@ -40,8 +42,9 @@ public class PartitionEventsByDsIdJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(PartitionEventsByDsIdJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/od_partitions_params.json"))); + .toString( + PartitionEventsByDsIdJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/od_partitions_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -63,21 +66,27 @@ public class PartitionEventsByDsIdJob { final Set validOpendoarIds = new HashSet<>(); if (!opendoarIds.trim().equals("-")) { - validOpendoarIds.addAll(Arrays.stream(opendoarIds.split(",")) - .map(String::trim) - .filter(StringUtils::isNotBlank) - .map(s -> OPENDOAR_NSPREFIX + DigestUtils.md5Hex(s)) - .collect(Collectors.toSet())); + validOpendoarIds + .addAll( + Arrays + .stream(opendoarIds.split(",")) + .map(String::trim) + .filter(StringUtils::isNotBlank) + .map(s -> OPENDOAR_NSPREFIX + DigestUtils.md5Hex(s)) + .collect(Collectors.toSet())); } + log.info("validOpendoarIds: {}", validOpendoarIds); runWithSparkSession(conf, isSparkSessionManaged, spark -> { ClusterUtils .readPath(spark, eventsPath, Event.class) - .filter(e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId())) - .filter(e -> e.getMap().getTargetDatasourceId().startsWith(OPENDOAR_NSPREFIX)) - .filter(e -> validOpendoarIds.contains(e.getMap().getTargetDatasourceId())) - .map(e -> messageFromNotification(e), Encoders.bean(ShortEventMessageWithGroupId.class)) + .filter((FilterFunction) e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId())) + .filter((FilterFunction) e -> e.getMap().getTargetDatasourceId().startsWith(OPENDOAR_NSPREFIX)) + .filter((FilterFunction) e -> validOpendoarIds.contains(e.getMap().getTargetDatasourceId())) + .map( + (MapFunction) e -> messageFromNotification(e), + Encoders.bean(ShortEventMessageWithGroupId.class)) .coalesce(1) .write() .partitionBy("group")