From 933b4c1ada5611572b38f9acb539382ce132f6cb Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 10 Dec 2020 14:47:51 +0100 Subject: [PATCH] workingDir and outputDir --- .../dhp/broker/oa/CheckDuplictedIdsJob.java | 8 ++++--- .../dhp/broker/oa/GenerateEventsJob.java | 11 ++++++---- .../dhp/broker/oa/GenerateStatsJob.java | 5 +++-- .../dhp/broker/oa/IndexEventSubsetJob.java | 5 +++-- .../dhp/broker/oa/IndexNotificationsJob.java | 17 +++++++++----- .../dnetlib/dhp/broker/oa/IndexOnESJob.java | 5 +++-- .../broker/oa/PartitionEventsByDsIdJob.java | 22 +++++++++++-------- 7 files changed, 45 insertions(+), 28 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java index 416ee5024..d42c692f7 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java @@ -30,8 +30,9 @@ public class CheckDuplictedIdsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(CheckDuplictedIdsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/check_duplicates.json"))); + .toString( + CheckDuplictedIdsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/check_duplicates.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); @@ -59,7 +60,8 @@ public class CheckDuplictedIdsJob { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(countPath);; + .json(countPath); + ; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java index 9e128d49e..457b82517 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java @@ -33,8 +33,9 @@ public class GenerateEventsJob { public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(GenerateEventsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_events.json"))); + .toString( + GenerateEventsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_events.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -72,8 +73,10 @@ public class GenerateEventsJob { .readPath(spark, workingDir + "/duplicates", ResultGroup.class); final Dataset dataset = groups - .map(g -> EventFinder - .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, accumulators), Encoders + .map( + g -> EventFinder + .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, accumulators), + Encoders .bean(EventGroup.class)) .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class)); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java index c44aead5e..2772f8fd1 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java @@ -33,8 +33,9 @@ public class GenerateStatsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(GenerateStatsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/stats_params.json"))); + .toString( + GenerateStatsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/stats_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java index 929c31600..72efc9e6b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java @@ -39,8 +39,9 @@ public class IndexEventSubsetJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(IndexEventSubsetJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_event_subset.json"))); + .toString( + IndexEventSubsetJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_event_subset.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index e1d2e7d24..29fc72d04 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -47,8 +47,9 @@ public class IndexNotificationsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(IndexNotificationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); + .toString( + IndexNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); @@ -116,7 +117,8 @@ public class IndexNotificationsJob { final long date) { final List list = subscriptions .stream() - .filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) + .filter( + s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) .map(s -> generateNotification(s, e, date)) .collect(Collectors.toList()); @@ -147,15 +149,18 @@ public class IndexNotificationsJob { if (conditions.containsKey("trust") && !SubscriptionUtils - .verifyFloatRange(map.getTrust(), conditions.get("trust").get(0).getValue(), conditions.get("trust").get(0).getOtherValue())) { + .verifyFloatRange( + map.getTrust(), conditions.get("trust").get(0).getValue(), + conditions.get("trust").get(0).getOtherValue())) { return false; } if (conditions.containsKey("targetDateofacceptance") && !conditions .get("targetDateofacceptance") .stream() - .anyMatch(c -> SubscriptionUtils - .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { + .anyMatch( + c -> SubscriptionUtils + .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { return false; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java index 106528666..006cde48c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java @@ -29,8 +29,9 @@ public class IndexOnESJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(IndexOnESJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_es.json"))); + .toString( + IndexOnESJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_es.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java index ec694f5d3..08c74a291 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java @@ -42,8 +42,9 @@ public class PartitionEventsByDsIdJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(PartitionEventsByDsIdJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/od_partitions_params.json"))); + .toString( + PartitionEventsByDsIdJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/od_partitions_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -66,12 +67,13 @@ public class PartitionEventsByDsIdJob { final Set validOpendoarIds = new HashSet<>(); if (!opendoarIds.trim().equals("-")) { validOpendoarIds - .addAll(Arrays - .stream(opendoarIds.split(",")) - .map(String::trim) - .filter(StringUtils::isNotBlank) - .map(s -> OPENDOAR_NSPREFIX + DigestUtils.md5Hex(s)) - .collect(Collectors.toSet())); + .addAll( + Arrays + .stream(opendoarIds.split(",")) + .map(String::trim) + .filter(StringUtils::isNotBlank) + .map(s -> OPENDOAR_NSPREFIX + DigestUtils.md5Hex(s)) + .collect(Collectors.toSet())); } log.info("validOpendoarIds: {}", validOpendoarIds); @@ -82,7 +84,9 @@ public class PartitionEventsByDsIdJob { .filter((FilterFunction) e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId())) .filter((FilterFunction) e -> e.getMap().getTargetDatasourceId().startsWith(OPENDOAR_NSPREFIX)) .filter((FilterFunction) e -> validOpendoarIds.contains(e.getMap().getTargetDatasourceId())) - .map((MapFunction) e -> messageFromNotification(e), Encoders.bean(ShortEventMessageWithGroupId.class)) + .map( + (MapFunction) e -> messageFromNotification(e), + Encoders.bean(ShortEventMessageWithGroupId.class)) .coalesce(1) .write() .partitionBy("group")