From 94bfed1c848fc003a60c6b6c922700bc1f3bae15 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 10 Dec 2020 11:59:28 +0100 Subject: [PATCH 1/8] gzipped output --- .../dhp/broker/oa/CheckDuplictedIdsJob.java | 9 ++++---- .../broker/oa/PartitionEventsByDsIdJob.java | 23 ++++++++----------- .../dhp/broker/oa/util/ClusterUtils.java | 1 + 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java index 5ca865e8f..951afb6c5 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java @@ -30,9 +30,8 @@ public class CheckDuplictedIdsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - CheckDuplictedIdsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + .toString(CheckDuplictedIdsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); @@ -59,8 +58,8 @@ public class CheckDuplictedIdsJob { .map(o -> ClusterUtils.incrementAccumulator(o, total), Encoders.tuple(Encoders.STRING(), Encoders.LONG())) .write() .mode(SaveMode.Overwrite) - .json(countPath); - ; + .option("compression", "gzip") + .json(countPath);; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java index e9644122f..1e060d824 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java @@ -42,9 +42,8 @@ public class PartitionEventsByDsIdJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - PartitionEventsByDsIdJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/od_partitions_params.json"))); + .toString(PartitionEventsByDsIdJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/od_partitions_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -67,13 +66,12 @@ public class PartitionEventsByDsIdJob { final Set validOpendoarIds = new HashSet<>(); if (!opendoarIds.trim().equals("-")) { validOpendoarIds - .addAll( - Arrays - .stream(opendoarIds.split(",")) - .map(String::trim) - .filter(StringUtils::isNotBlank) - .map(s -> OPENDOAR_NSPREFIX + DigestUtils.md5Hex(s)) - .collect(Collectors.toSet())); + .addAll(Arrays + .stream(opendoarIds.split(",")) + .map(String::trim) + .filter(StringUtils::isNotBlank) + .map(s -> OPENDOAR_NSPREFIX + DigestUtils.md5Hex(s)) + .collect(Collectors.toSet())); } log.info("validOpendoarIds: {}", validOpendoarIds); @@ -84,13 +82,12 @@ public class PartitionEventsByDsIdJob { .filter((FilterFunction) e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId())) .filter((FilterFunction) e -> e.getMap().getTargetDatasourceId().startsWith(OPENDOAR_NSPREFIX)) .filter((FilterFunction) e -> validOpendoarIds.contains(e.getMap().getTargetDatasourceId())) - .map( - (MapFunction) e -> messageFromNotification(e), - Encoders.bean(ShortEventMessageWithGroupId.class)) + .map((MapFunction) e -> messageFromNotification(e), Encoders.bean(ShortEventMessageWithGroupId.class)) .coalesce(1) .write() .partitionBy("group") .mode(SaveMode.Overwrite) + .option("compression", "gzip") .json(partitionPath); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java index d8b8dd807..04985a6ab 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java @@ -67,6 +67,7 @@ public class ClusterUtils { .map(o -> ClusterUtils.incrementAccumulator(o, acc), Encoders.bean(clazz)) .write() .mode(SaveMode.Overwrite) + .option("compression", "gzip") .json(path); } -- 2.17.1 From 2e7df07328cf53083567d990ccba3421cde0a280 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 10 Dec 2020 14:47:22 +0100 Subject: [PATCH 2/8] workingDir and outputDir --- .../dhp/broker/oa/CheckDuplictedIdsJob.java | 6 +-- .../dhp/broker/oa/GenerateEventsJob.java | 19 ++++------ .../dhp/broker/oa/GenerateStatsJob.java | 7 ++-- .../dhp/broker/oa/IndexEventSubsetJob.java | 7 ++-- .../dhp/broker/oa/IndexNotificationsJob.java | 19 ++++------ .../dnetlib/dhp/broker/oa/IndexOnESJob.java | 7 ++-- .../dnetlib/dhp/broker/oa/JoinStep0Job.java | 10 ++--- .../dnetlib/dhp/broker/oa/JoinStep1Job.java | 10 ++--- .../dnetlib/dhp/broker/oa/JoinStep2Job.java | 10 ++--- .../dnetlib/dhp/broker/oa/JoinStep3Job.java | 10 ++--- .../dnetlib/dhp/broker/oa/JoinStep4Job.java | 10 ++--- .../broker/oa/PartitionEventsByDsIdJob.java | 4 +- .../dhp/broker/oa/PrepareGroupsJob.java | 8 ++-- .../broker/oa/PrepareRelatedDatasetsJob.java | 6 +-- .../oa/PrepareRelatedDatasourcesJob.java | 6 +-- .../broker/oa/PrepareRelatedProjectsJob.java | 6 +-- .../oa/PrepareRelatedPublicationsJob.java | 6 +-- .../broker/oa/PrepareRelatedSoftwaresJob.java | 6 +-- .../broker/oa/PrepareSimpleEntititiesJob.java | 6 +-- .../dhp/broker/oa/check_duplicates.json | 9 +++++ .../dnetlib/dhp/broker/oa/common_params.json | 2 +- .../oa/generate_all/oozie_app/workflow.xml | 37 ++++++++++--------- .../dhp/broker/oa/generate_events.json | 8 +++- .../eu/dnetlib/dhp/broker/oa/index_es.json | 4 +- .../dhp/broker/oa/index_event_subset.json | 4 +- .../dhp/broker/oa/index_notifications.json | 4 +- .../notifications_only/oozie_app/workflow.xml | 6 +-- .../dhp/broker/oa/od_partitions_params.json | 4 +- .../opendoarPartition/oozie_app/workflow.xml | 4 +- .../dnetlib/dhp/broker/oa/stats_params.json | 6 +-- 30 files changed, 128 insertions(+), 123 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/check_duplicates.json diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java index 951afb6c5..416ee5024 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java @@ -31,15 +31,15 @@ public class CheckDuplictedIdsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString(CheckDuplictedIdsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/check_duplicates.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("workingPath") + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); - final String countPath = parser.get("workingPath") + "/counts"; + final String countPath = parser.get("outputDir") + "/counts"; log.info("countPath: {}", countPath); final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java index cfee360c5..9e128d49e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java @@ -33,9 +33,8 @@ public class GenerateEventsJob { public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - GenerateEventsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_events.json"))); + .toString(GenerateEventsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_events.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -44,10 +43,10 @@ public class GenerateEventsJob { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String eventsPath = workingPath + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); final Set dsIdWhitelist = ClusterUtils.parseParamAsList(parser, "datasourceIdWhitelist"); @@ -70,13 +69,11 @@ public class GenerateEventsJob { final LongAccumulator total = spark.sparkContext().longAccumulator("total_events"); final Dataset groups = ClusterUtils - .readPath(spark, workingPath + "/duplicates", ResultGroup.class); + .readPath(spark, workingDir + "/duplicates", ResultGroup.class); final Dataset dataset = groups - .map( - g -> EventFinder - .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, accumulators), - Encoders + .map(g -> EventFinder + .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, accumulators), Encoders .bean(EventGroup.class)) .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class)); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java index d5c53ea36..c44aead5e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java @@ -33,9 +33,8 @@ public class GenerateStatsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - GenerateStatsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/stats_params.json"))); + .toString(GenerateStatsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/stats_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -46,7 +45,7 @@ public class GenerateStatsJob { final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("workingPath") + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); final String dbUrl = parser.get("dbUrl"); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java index d3cbe0034..929c31600 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java @@ -39,14 +39,13 @@ public class IndexEventSubsetJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - IndexEventSubsetJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_event_subset.json"))); + .toString(IndexEventSubsetJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_event_subset.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("workingPath") + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); final String index = parser.get("index"); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index 792a2354a..e1d2e7d24 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -47,14 +47,13 @@ public class IndexNotificationsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - IndexNotificationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); + .toString(IndexNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("workingPath") + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); final String index = parser.get("index"); @@ -117,8 +116,7 @@ public class IndexNotificationsJob { final long date) { final List list = subscriptions .stream() - .filter( - s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) + .filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) .map(s -> generateNotification(s, e, date)) .collect(Collectors.toList()); @@ -149,18 +147,15 @@ public class IndexNotificationsJob { if (conditions.containsKey("trust") && !SubscriptionUtils - .verifyFloatRange( - map.getTrust(), conditions.get("trust").get(0).getValue(), - conditions.get("trust").get(0).getOtherValue())) { + .verifyFloatRange(map.getTrust(), conditions.get("trust").get(0).getValue(), conditions.get("trust").get(0).getOtherValue())) { return false; } if (conditions.containsKey("targetDateofacceptance") && !conditions .get("targetDateofacceptance") .stream() - .anyMatch( - c -> SubscriptionUtils - .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { + .anyMatch(c -> SubscriptionUtils + .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { return false; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java index 762bfbb90..106528666 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java @@ -29,14 +29,13 @@ public class IndexOnESJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - IndexOnESJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_es.json"))); + .toString(IndexOnESJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_es.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("workingPath") + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); final String index = parser.get("index"); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java index 39fa76e43..01778ad74 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java @@ -42,10 +42,10 @@ public class JoinStep0Job { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String joinedEntitiesPath = workingPath + "/joinedEntities_step0"; + final String joinedEntitiesPath = workingDir + "/joinedEntities_step0"; log.info("joinedEntitiesPath: {}", joinedEntitiesPath); final SparkConf conf = new SparkConf(); @@ -57,10 +57,10 @@ public class JoinStep0Job { final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final Dataset sources = ClusterUtils - .readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class); + .readPath(spark, workingDir + "/simpleEntities", OaBrokerMainEntity.class); final Dataset typedRels = ClusterUtils - .readPath(spark, workingPath + "/relatedDatasources", RelatedDatasource.class); + .readPath(spark, workingDir + "/relatedDatasources", RelatedDatasource.class); final TypedColumn, OaBrokerMainEntity> aggr = new RelatedDatasourceAggregator() .toColumn(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java index 8e502f736..82c3619e1 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java @@ -40,10 +40,10 @@ public class JoinStep1Job { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String joinedEntitiesPath = workingPath + "/joinedEntities_step1"; + final String joinedEntitiesPath = workingDir + "/joinedEntities_step1"; log.info("joinedEntitiesPath: {}", joinedEntitiesPath); final SparkConf conf = new SparkConf(); @@ -55,10 +55,10 @@ public class JoinStep1Job { final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final Dataset sources = ClusterUtils - .readPath(spark, workingPath + "/joinedEntities_step0", OaBrokerMainEntity.class); + .readPath(spark, workingDir + "/joinedEntities_step0", OaBrokerMainEntity.class); final Dataset typedRels = ClusterUtils - .readPath(spark, workingPath + "/relatedProjects", RelatedProject.class); + .readPath(spark, workingDir + "/relatedProjects", RelatedProject.class); final TypedColumn, OaBrokerMainEntity> aggr = new RelatedProjectAggregator() .toColumn(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java index 55ab497f0..bd6135d41 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java @@ -39,10 +39,10 @@ public class JoinStep2Job { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String joinedEntitiesPath = workingPath + "/joinedEntities_step2"; + final String joinedEntitiesPath = workingDir + "/joinedEntities_step2"; log.info("joinedEntitiesPath: {}", joinedEntitiesPath); final SparkConf conf = new SparkConf(); @@ -54,10 +54,10 @@ public class JoinStep2Job { final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final Dataset sources = ClusterUtils - .readPath(spark, workingPath + "/joinedEntities_step1", OaBrokerMainEntity.class); + .readPath(spark, workingDir + "/joinedEntities_step1", OaBrokerMainEntity.class); final Dataset typedRels = ClusterUtils - .readPath(spark, workingPath + "/relatedSoftwares", RelatedSoftware.class); + .readPath(spark, workingDir + "/relatedSoftwares", RelatedSoftware.class); final TypedColumn, OaBrokerMainEntity> aggr = new RelatedSoftwareAggregator() .toColumn(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java index 4d06f6f13..18e8c00b2 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java @@ -40,10 +40,10 @@ public class JoinStep3Job { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String joinedEntitiesPath = workingPath + "/joinedEntities_step3"; + final String joinedEntitiesPath = workingDir + "/joinedEntities_step3"; log.info("joinedEntitiesPath: {}", joinedEntitiesPath); final SparkConf conf = new SparkConf(); @@ -55,10 +55,10 @@ public class JoinStep3Job { final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final Dataset sources = ClusterUtils - .readPath(spark, workingPath + "/joinedEntities_step2", OaBrokerMainEntity.class); + .readPath(spark, workingDir + "/joinedEntities_step2", OaBrokerMainEntity.class); final Dataset typedRels = ClusterUtils - .readPath(spark, workingPath + "/relatedDatasets", RelatedDataset.class); + .readPath(spark, workingDir + "/relatedDatasets", RelatedDataset.class); final TypedColumn, OaBrokerMainEntity> aggr = new RelatedDatasetAggregator() .toColumn(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java index b53d7e39b..965530362 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java @@ -40,10 +40,10 @@ public class JoinStep4Job { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String joinedEntitiesPath = workingPath + "/joinedEntities_step4"; + final String joinedEntitiesPath = workingDir + "/joinedEntities_step4"; log.info("joinedEntitiesPath: {}", joinedEntitiesPath); final SparkConf conf = new SparkConf(); @@ -55,10 +55,10 @@ public class JoinStep4Job { final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final Dataset sources = ClusterUtils - .readPath(spark, workingPath + "/joinedEntities_step3", OaBrokerMainEntity.class); + .readPath(spark, workingDir + "/joinedEntities_step3", OaBrokerMainEntity.class); final Dataset typedRels = ClusterUtils - .readPath(spark, workingPath + "/relatedPublications", RelatedPublication.class); + .readPath(spark, workingDir + "/relatedPublications", RelatedPublication.class); final TypedColumn, OaBrokerMainEntity> aggr = new RelatedPublicationAggregator() .toColumn(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java index 1e060d824..ec694f5d3 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java @@ -54,10 +54,10 @@ public class PartitionEventsByDsIdJob { final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("workingPath") + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); - final String partitionPath = parser.get("workingPath") + "/eventsByOpendoarId"; + final String partitionPath = parser.get("outputDir") + "/eventsByOpendoarId"; log.info("partitionPath: {}", partitionPath); final String opendoarIds = parser.get("opendoarIds"); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java index eb9add00d..a575328e8 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java @@ -45,10 +45,10 @@ public class PrepareGroupsJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String groupsPath = workingPath + "/duplicates"; + final String groupsPath = workingDir + "/duplicates"; log.info("groupsPath: {}", groupsPath); final SparkConf conf = new SparkConf(); @@ -60,7 +60,7 @@ public class PrepareGroupsJob { final LongAccumulator total = spark.sparkContext().longAccumulator("total_groups"); final Dataset results = ClusterUtils - .readPath(spark, workingPath + "/joinedEntities_step4", OaBrokerMainEntity.class); + .readPath(spark, workingDir + "/joinedEntities_step4", OaBrokerMainEntity.class); final Dataset mergedRels = ClusterUtils .readPath(spark, graphPath + "/relation", Relation.class) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java index 0cfc1adcb..8c2879b38 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java @@ -42,10 +42,10 @@ public class PrepareRelatedDatasetsJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String relsPath = workingPath + "/relatedDatasets"; + final String relsPath = workingDir + "/relatedDatasets"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java index 166372a7f..0c2318127 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java @@ -48,10 +48,10 @@ public class PrepareRelatedDatasourcesJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String relsPath = workingPath + "/relatedDatasources"; + final String relsPath = workingDir + "/relatedDatasources"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java index e988366c8..1952b5d9c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java @@ -44,10 +44,10 @@ public class PrepareRelatedProjectsJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String relsPath = workingPath + "/relatedProjects"; + final String relsPath = workingDir + "/relatedProjects"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java index 724acc4dc..d6ac27fc5 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java @@ -43,10 +43,10 @@ public class PrepareRelatedPublicationsJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String relsPath = workingPath + "/relatedPublications"; + final String relsPath = workingDir + "/relatedPublications"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java index d15565d0d..c4ccf4af6 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java @@ -44,10 +44,10 @@ public class PrepareRelatedSoftwaresJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String relsPath = workingPath + "/relatedSoftwares"; + final String relsPath = workingDir + "/relatedSoftwares"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java index d3c7113ec..cf4450603 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java @@ -44,10 +44,10 @@ public class PrepareSimpleEntititiesJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String simpleEntitiesPath = workingPath + "/simpleEntities"; + final String simpleEntitiesPath = workingDir + "/simpleEntities"; log.info("simpleEntitiesPath: {}", simpleEntitiesPath); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/check_duplicates.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/check_duplicates.json new file mode 100644 index 000000000..2584b78fc --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/check_duplicates.json @@ -0,0 +1,9 @@ +[ + + { + "paramName": "o", + "paramLongName": "outputDir", + "paramDescription": "the path where the data are stored", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/common_params.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/common_params.json index adee1888a..0d942cd59 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/common_params.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/common_params.json @@ -7,7 +7,7 @@ }, { "paramName": "o", - "paramLongName": "workingPath", + "paramLongName": "workingDir", "paramDescription": "the path where the temporary data will be stored", "paramRequired": true } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 14e33b091..1a1df418b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -6,7 +6,7 @@ the path where the graph is stored - workingPath + outputDir the path where the the generated data will be stored @@ -119,7 +119,7 @@ - + @@ -152,7 +152,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -176,7 +176,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -201,7 +201,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -225,7 +225,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -249,7 +249,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -273,7 +273,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -299,7 +299,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -323,7 +323,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -347,7 +347,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -371,7 +371,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -395,7 +395,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -419,7 +419,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -442,7 +442,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --workingPath${workingPath} + --workingDir${workingDir} + --outputDir${outputDir} --datasourceIdWhitelist${datasourceIdWhitelist} --datasourceTypeWhitelist${datasourceTypeWhitelist} --datasourceIdBlacklist${datasourceIdBlacklist} @@ -468,7 +469,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --workingPath${workingPath} + --outputDir${outputDir} --index${esEventIndexName} --esHost${esIndexHost} --maxEventsForTopic${maxIndexedEventsForDsAndTopic} @@ -495,7 +496,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --workingPath${workingPath} + --outputDir${outputDir} --index${esNotificationsIndexName} --esHost${esIndexHost} --brokerApiBaseUrl${brokerApiBaseUrl} @@ -521,7 +522,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --workingPath${workingPath} + --outputDir${outputDir} --dbUrl${brokerDbUrl} --dbUser${brokerDbUser} --dbPassword${brokerDbPassword} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json index bab808193..a482ee2dc 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json @@ -1,7 +1,13 @@ [ + { + "paramName": "wp", + "paramLongName": "workingDir", + "paramDescription": "the path where the temporary data are stored", + "paramRequired": true + }, { "paramName": "o", - "paramLongName": "workingPath", + "paramLongName": "outputDir", "paramDescription": "the path where the generated events will be stored", "paramRequired": true }, diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json index ac1dbf786..079709ad8 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json @@ -1,8 +1,8 @@ [ { "paramName": "o", - "paramLongName": "workingPath", - "paramDescription": "the workinh path", + "paramLongName": "outputDir", + "paramDescription": "the data path", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json index 4921bc03e..441249661 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json @@ -1,8 +1,8 @@ [ { "paramName": "o", - "paramLongName": "workingPath", - "paramDescription": "the workinh path", + "paramLongName": "outputDir", + "paramDescription": "the path where the generated data are stored", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json index 5eea894c8..63e9b1263 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json @@ -1,8 +1,8 @@ [ { "paramName": "o", - "paramLongName": "workingPath", - "paramDescription": "the workinh path", + "paramLongName": "outputDir", + "paramDescription": "the dir that contains the events folder", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml index 879c0d349..c73b3fb52 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml @@ -6,8 +6,8 @@ the path where the graph is stored - workingPath - the path where the the generated data will be stored + outputDir + the path where the the generated data are stored datasourceIdWhitelist @@ -122,7 +122,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --workingPath${workingPath} + --outputDir${outputDir} --index${esNotificationsIndexName} --esHost${esIndexHost} --brokerApiBaseUrl${brokerApiBaseUrl} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/od_partitions_params.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/od_partitions_params.json index 10ba926ab..12cd6a391 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/od_partitions_params.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/od_partitions_params.json @@ -1,8 +1,8 @@ [ { "paramName": "o", - "paramLongName": "workingPath", - "paramDescription": "the path where the temporary data will be stored", + "paramLongName": "outputDir", + "paramDescription": "the path where the data will be stored", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/opendoarPartition/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/opendoarPartition/oozie_app/workflow.xml index dba3c9f73..83fe47c75 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/opendoarPartition/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/opendoarPartition/oozie_app/workflow.xml @@ -6,7 +6,7 @@ the opendoar IDs whitelist (comma separated) - workingPath + outputDir the path where the the generated data will be stored @@ -87,7 +87,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --workingPath${workingPath} + --workingDir${workingDir} --opendoarIds${opendoarIds} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json index 15d7d251f..2388b1c1f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json @@ -1,8 +1,8 @@ [ { - "paramName": "wp", - "paramLongName": "workingPath", - "paramDescription": "the working path", + "paramName": "o", + "paramLongName": "outputDir", + "paramDescription": "the path where generated data are stored", "paramRequired": true }, { -- 2.17.1 From 933b4c1ada5611572b38f9acb539382ce132f6cb Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 10 Dec 2020 14:47:51 +0100 Subject: [PATCH 3/8] workingDir and outputDir --- .../dhp/broker/oa/CheckDuplictedIdsJob.java | 8 ++++--- .../dhp/broker/oa/GenerateEventsJob.java | 11 ++++++---- .../dhp/broker/oa/GenerateStatsJob.java | 5 +++-- .../dhp/broker/oa/IndexEventSubsetJob.java | 5 +++-- .../dhp/broker/oa/IndexNotificationsJob.java | 17 +++++++++----- .../dnetlib/dhp/broker/oa/IndexOnESJob.java | 5 +++-- .../broker/oa/PartitionEventsByDsIdJob.java | 22 +++++++++++-------- 7 files changed, 45 insertions(+), 28 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java index 416ee5024..d42c692f7 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java @@ -30,8 +30,9 @@ public class CheckDuplictedIdsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(CheckDuplictedIdsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/check_duplicates.json"))); + .toString( + CheckDuplictedIdsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/check_duplicates.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); @@ -59,7 +60,8 @@ public class CheckDuplictedIdsJob { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(countPath);; + .json(countPath); + ; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java index 9e128d49e..457b82517 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java @@ -33,8 +33,9 @@ public class GenerateEventsJob { public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(GenerateEventsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_events.json"))); + .toString( + GenerateEventsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_events.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -72,8 +73,10 @@ public class GenerateEventsJob { .readPath(spark, workingDir + "/duplicates", ResultGroup.class); final Dataset dataset = groups - .map(g -> EventFinder - .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, accumulators), Encoders + .map( + g -> EventFinder + .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, accumulators), + Encoders .bean(EventGroup.class)) .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class)); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java index c44aead5e..2772f8fd1 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java @@ -33,8 +33,9 @@ public class GenerateStatsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(GenerateStatsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/stats_params.json"))); + .toString( + GenerateStatsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/stats_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java index 929c31600..72efc9e6b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java @@ -39,8 +39,9 @@ public class IndexEventSubsetJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(IndexEventSubsetJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_event_subset.json"))); + .toString( + IndexEventSubsetJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_event_subset.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index e1d2e7d24..29fc72d04 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -47,8 +47,9 @@ public class IndexNotificationsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(IndexNotificationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); + .toString( + IndexNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); @@ -116,7 +117,8 @@ public class IndexNotificationsJob { final long date) { final List list = subscriptions .stream() - .filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) + .filter( + s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) .map(s -> generateNotification(s, e, date)) .collect(Collectors.toList()); @@ -147,15 +149,18 @@ public class IndexNotificationsJob { if (conditions.containsKey("trust") && !SubscriptionUtils - .verifyFloatRange(map.getTrust(), conditions.get("trust").get(0).getValue(), conditions.get("trust").get(0).getOtherValue())) { + .verifyFloatRange( + map.getTrust(), conditions.get("trust").get(0).getValue(), + conditions.get("trust").get(0).getOtherValue())) { return false; } if (conditions.containsKey("targetDateofacceptance") && !conditions .get("targetDateofacceptance") .stream() - .anyMatch(c -> SubscriptionUtils - .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { + .anyMatch( + c -> SubscriptionUtils + .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { return false; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java index 106528666..006cde48c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java @@ -29,8 +29,9 @@ public class IndexOnESJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(IndexOnESJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_es.json"))); + .toString( + IndexOnESJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_es.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java index ec694f5d3..08c74a291 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java @@ -42,8 +42,9 @@ public class PartitionEventsByDsIdJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(PartitionEventsByDsIdJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/od_partitions_params.json"))); + .toString( + PartitionEventsByDsIdJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/od_partitions_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -66,12 +67,13 @@ public class PartitionEventsByDsIdJob { final Set validOpendoarIds = new HashSet<>(); if (!opendoarIds.trim().equals("-")) { validOpendoarIds - .addAll(Arrays - .stream(opendoarIds.split(",")) - .map(String::trim) - .filter(StringUtils::isNotBlank) - .map(s -> OPENDOAR_NSPREFIX + DigestUtils.md5Hex(s)) - .collect(Collectors.toSet())); + .addAll( + Arrays + .stream(opendoarIds.split(",")) + .map(String::trim) + .filter(StringUtils::isNotBlank) + .map(s -> OPENDOAR_NSPREFIX + DigestUtils.md5Hex(s)) + .collect(Collectors.toSet())); } log.info("validOpendoarIds: {}", validOpendoarIds); @@ -82,7 +84,9 @@ public class PartitionEventsByDsIdJob { .filter((FilterFunction) e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId())) .filter((FilterFunction) e -> e.getMap().getTargetDatasourceId().startsWith(OPENDOAR_NSPREFIX)) .filter((FilterFunction) e -> validOpendoarIds.contains(e.getMap().getTargetDatasourceId())) - .map((MapFunction) e -> messageFromNotification(e), Encoders.bean(ShortEventMessageWithGroupId.class)) + .map( + (MapFunction) e -> messageFromNotification(e), + Encoders.bean(ShortEventMessageWithGroupId.class)) .coalesce(1) .write() .partitionBy("group") -- 2.17.1 From 399548f221eb252c381c58d7df77352620b048c8 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 14 Dec 2020 11:03:55 +0100 Subject: [PATCH 4/8] whitelist of topics --- .../dhp/broker/oa/GenerateEventsJob.java | 5 +- .../dhp/broker/oa/util/EventFinder.java | 9 +- .../oa/generate_all/oozie_app/workflow.xml | 6 + .../dhp/broker/oa/generate_events.json | 6 + .../oa/reindex/oozie_app/config-default.xml | 18 +++ .../broker/oa/reindex/oozie_app/workflow.xml | 116 ++++++++++++++++++ 6 files changed, 158 insertions(+), 2 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java index 457b82517..1ae241e34 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java @@ -59,6 +59,9 @@ public class GenerateEventsJob { final Set dsIdBlacklist = ClusterUtils.parseParamAsList(parser, "datasourceIdBlacklist"); log.info("datasourceIdBlacklist: {}", StringUtils.join(dsIdBlacklist, ",")); + final Set topicWhitelist = ClusterUtils.parseParamAsList(parser, "topicWhitelist"); + log.info("topicWhitelist: {}", StringUtils.join(topicWhitelist, ",")); + final SparkConf conf = new SparkConf(); runWithSparkSession(conf, isSparkSessionManaged, spark -> { @@ -75,7 +78,7 @@ public class GenerateEventsJob { final Dataset dataset = groups .map( g -> EventFinder - .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, accumulators), + .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, topicWhitelist, accumulators), Encoders .bean(EventGroup.class)) .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class)); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java index 1ab56cc34..103751f95 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java @@ -76,6 +76,7 @@ public class EventFinder { final Set dsIdWhitelist, final Set dsIdBlacklist, final Set dsTypeWhitelist, + final Set topicWhitelist, final Map accumulators) { final List> list = new ArrayList<>(); @@ -84,7 +85,13 @@ public class EventFinder { for (final OaBrokerRelatedDatasource targetDs : target.getDatasources()) { if (verifyTarget(targetDs, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) { for (final UpdateMatcher matcher : matchers) { - list.addAll(matcher.searchUpdatesForRecord(target, targetDs, results.getData(), accumulators)); + for (final UpdateInfo info : matcher + .searchUpdatesForRecord(target, targetDs, results.getData(), accumulators)) { + if (topicWhitelist == null || topicWhitelist.isEmpty() + || topicWhitelist.contains(info.getTopic().getPath())) { + list.add(info); + } + } } } } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 1a1df418b..ea81c90d0 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -24,6 +24,11 @@ - a black list (comma separeted, - for empty list) of datasource ids + + topicWhitelist + * + a white list (comma separeted, * for all) of topics + esEventIndexName the elasticsearch index name for events @@ -447,6 +452,7 @@ --datasourceIdWhitelist${datasourceIdWhitelist} --datasourceTypeWhitelist${datasourceTypeWhitelist} --datasourceIdBlacklist${datasourceIdBlacklist} + --topicWhitelist${topicWhitelist} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json index a482ee2dc..e803bb5b9 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json @@ -28,5 +28,11 @@ "paramLongName": "datasourceIdBlacklist", "paramDescription": "a black list (comma separeted, - for empty list) of datasource ids", "paramRequired": true + }, + { + "paramName": "topicWhitelist", + "paramLongName": "topicWhitelist", + "paramDescription": "a white list (comma separeted, * for all) of topics", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/config-default.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/config-default.xml new file mode 100644 index 000000000..2e0ed9aee --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml new file mode 100644 index 000000000..43c26f3a3 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml @@ -0,0 +1,116 @@ + + + + + outputDir + the path where the the generated data will be stored + + + esEventIndexName + the elasticsearch index name for events + + + esIndexHost + the elasticsearch host + + + maxIndexedEventsForDsAndTopic + the max number of events for each couple (ds/topic) + + + brokerApiBaseUrl + the url of the broker service api + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + IndexEventSubsetOnESJob + eu.dnetlib.dhp.broker.oa.IndexEventSubsetJob + dhp-broker-events-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --outputDir${outputDir} + --index${esEventIndexName} + --esHost${esIndexHost} + --maxEventsForTopic${maxIndexedEventsForDsAndTopic} + --brokerApiBaseUrl${brokerApiBaseUrl} + + + + + + + + + + \ No newline at end of file -- 2.17.1 From d03756c962c31b8dc7929242dcca9e990b28a51a Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 14 Dec 2020 11:11:41 +0100 Subject: [PATCH 5/8] mkdir of output dir --- .../dhp/broker/oa/generate_all/oozie_app/workflow.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index ea81c90d0..250f928f8 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -116,15 +116,15 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + - + -- 2.17.1 From a203aee32a61c67a20ebb2965ade8e150880089e Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 14 Dec 2020 12:02:33 +0100 Subject: [PATCH 6/8] ES wf properties --- .../dhp/broker/oa/IndexEventSubsetJob.java | 20 ++++++++++--- .../dhp/broker/oa/IndexNotificationsJob.java | 20 ++++++++++--- .../dnetlib/dhp/broker/oa/IndexOnESJob.java | 22 +++++++++++---- .../oa/generate_all/oozie_app/workflow.xml | 28 +++++++++++++++++++ .../eu/dnetlib/dhp/broker/oa/index_es.json | 24 ++++++++++++++++ .../dhp/broker/oa/index_event_subset.json | 24 ++++++++++++++++ .../dhp/broker/oa/index_notifications.json | 24 ++++++++++++++++ .../notifications_only/oozie_app/workflow.xml | 24 ++++++++++++++++ .../broker/oa/reindex/oozie_app/workflow.xml | 24 ++++++++++++++++ 9 files changed, 196 insertions(+), 14 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java index 72efc9e6b..e18a7ef56 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java @@ -55,6 +55,18 @@ public class IndexEventSubsetJob { final String indexHost = parser.get("esHost"); log.info("indexHost: {}", indexHost); + final String esBatchWriteRetryCount = parser.get("esBatchWriteRetryCount"); + log.info("esBatchWriteRetryCount: {}", esBatchWriteRetryCount); + + final String esBatchWriteRetryWait = parser.get("esBatchWriteRetryWait"); + log.info("esBatchWriteRetryWait: {}", esBatchWriteRetryWait); + + final String esBatchSizeEntries = parser.get("esBatchSizeEntries"); + log.info("esBatchSizeEntries: {}", esBatchSizeEntries); + + final String esNodesWanOnly = parser.get("esNodesWanOnly"); + log.info("esNodesWanOnly: {}", esNodesWanOnly); + final int maxEventsForTopic = NumberUtils.toInt(parser.get("maxEventsForTopic")); log.info("maxEventsForTopic: {}", maxEventsForTopic); @@ -86,10 +98,10 @@ public class IndexEventSubsetJob { esCfg.put("es.index.auto.create", "false"); esCfg.put("es.nodes", indexHost); esCfg.put("es.mapping.id", "eventId"); // THE PRIMARY KEY - esCfg.put("es.batch.write.retry.count", "8"); - esCfg.put("es.batch.write.retry.wait", "60s"); - esCfg.put("es.batch.size.entries", "200"); - esCfg.put("es.nodes.wan.only", "true"); + esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount); + esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait); + esCfg.put("es.batch.size.entries", esBatchSizeEntries); + esCfg.put("es.nodes.wan.only", esNodesWanOnly); log.info("*** Start indexing"); JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index 29fc72d04..75f4eb066 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -63,6 +63,18 @@ public class IndexNotificationsJob { final String indexHost = parser.get("esHost"); log.info("indexHost: {}", indexHost); + final String esBatchWriteRetryCount = parser.get("esBatchWriteRetryCount"); + log.info("esBatchWriteRetryCount: {}", esBatchWriteRetryCount); + + final String esBatchWriteRetryWait = parser.get("esBatchWriteRetryWait"); + log.info("esBatchWriteRetryWait: {}", esBatchWriteRetryWait); + + final String esBatchSizeEntries = parser.get("esBatchSizeEntries"); + log.info("esBatchSizeEntries: {}", esBatchSizeEntries); + + final String esNodesWanOnly = parser.get("esNodesWanOnly"); + log.info("esNodesWanOnly: {}", esNodesWanOnly); + final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl"); log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl); @@ -92,10 +104,10 @@ public class IndexNotificationsJob { esCfg.put("es.index.auto.create", "false"); esCfg.put("es.nodes", indexHost); esCfg.put("es.mapping.id", "notificationId"); // THE PRIMARY KEY - esCfg.put("es.batch.write.retry.count", "8"); - esCfg.put("es.batch.write.retry.wait", "60s"); - esCfg.put("es.batch.size.entries", "200"); - esCfg.put("es.nodes.wan.only", "true"); + esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount); + esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait); + esCfg.put("es.batch.size.entries", esBatchSizeEntries); + esCfg.put("es.nodes.wan.only", esNodesWanOnly); log.info("*** Start indexing"); JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java index 006cde48c..380a689e4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java @@ -45,6 +45,18 @@ public class IndexOnESJob { final String indexHost = parser.get("esHost"); log.info("indexHost: {}", indexHost); + final String esBatchWriteRetryCount = parser.get("esBatchWriteRetryCount"); + log.info("esBatchWriteRetryCount: {}", esBatchWriteRetryCount); + + final String esBatchWriteRetryWait = parser.get("esBatchWriteRetryWait"); + log.info("esBatchWriteRetryWait: {}", esBatchWriteRetryWait); + + final String esBatchSizeEntries = parser.get("esBatchSizeEntries"); + log.info("esBatchSizeEntries: {}", esBatchSizeEntries); + + final String esNodesWanOnly = parser.get("esNodesWanOnly"); + log.info("esNodesWanOnly: {}", esNodesWanOnly); + final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); final JavaRDD inputRdd = ClusterUtils @@ -53,15 +65,13 @@ public class IndexOnESJob { .javaRDD(); final Map esCfg = new HashMap<>(); - // esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54"); - esCfg.put("es.index.auto.create", "false"); esCfg.put("es.nodes", indexHost); esCfg.put("es.mapping.id", "eventId"); // THE PRIMARY KEY - esCfg.put("es.batch.write.retry.count", "8"); - esCfg.put("es.batch.write.retry.wait", "60s"); - esCfg.put("es.batch.size.entries", "200"); - esCfg.put("es.nodes.wan.only", "true"); + esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount); + esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait); + esCfg.put("es.batch.size.entries", esBatchSizeEntries); + esCfg.put("es.nodes.wan.only", esNodesWanOnly); JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 250f928f8..d06eec87e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -41,6 +41,26 @@ esIndexHost the elasticsearch host + + esBatchWriteRetryCount + 8 + an ES configuration property + + + esBatchWriteRetryWait + 60s + an ES configuration property + + + esBatchSizeEntries + 200 + an ES configuration property + + + esNodesWanOnly + true + an ES configuration property + maxIndexedEventsForDsAndTopic the max number of events for each couple (ds/topic) @@ -478,6 +498,10 @@ --outputDir${outputDir} --index${esEventIndexName} --esHost${esIndexHost} + --esBatchWriteRetryCount${esBatchWriteRetryCount} + --esBatchWriteRetryWait${esBatchWriteRetryWait} + --esBatchSizeEntries${esBatchSizeEntries} + --esNodesWanOnly${esNodesWanOnly} --maxEventsForTopic${maxIndexedEventsForDsAndTopic} --brokerApiBaseUrl${brokerApiBaseUrl} @@ -505,6 +529,10 @@ --outputDir${outputDir} --index${esNotificationsIndexName} --esHost${esIndexHost} + --esBatchWriteRetryCount${esBatchWriteRetryCount} + --esBatchWriteRetryWait${esBatchWriteRetryWait} + --esBatchSizeEntries${esBatchSizeEntries} + --esNodesWanOnly${esNodesWanOnly} --brokerApiBaseUrl${brokerApiBaseUrl} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json index 079709ad8..f7e072d0f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json @@ -16,5 +16,29 @@ "paramLongName": "esHost", "paramDescription": "the ES host", "paramRequired": true + }, + { + "paramName": "esBatchWriteRetryCount", + "paramLongName": "esBatchWriteRetryCount", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esBatchWriteRetryWait", + "paramLongName": "esBatchWriteRetryWait", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esBatchSizeEntries", + "paramLongName": "esBatchSizeEntries", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esNodesWanOnly", + "paramLongName": "esNodesWanOnly", + "paramDescription": "an ES configuration property", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json index 441249661..0046490bb 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json @@ -16,7 +16,31 @@ "paramLongName": "esHost", "paramDescription": "the ES host", "paramRequired": true + }, + { + "paramName": "esBatchWriteRetryCount", + "paramLongName": "esBatchWriteRetryCount", + "paramDescription": "an ES configuration property", + "paramRequired": true }, + { + "paramName": "esBatchWriteRetryWait", + "paramLongName": "esBatchWriteRetryWait", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esBatchSizeEntries", + "paramLongName": "esBatchSizeEntries", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esNodesWanOnly", + "paramLongName": "esNodesWanOnly", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, { "paramName": "n", "paramLongName": "maxEventsForTopic", diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json index 63e9b1263..370b48411 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json @@ -17,6 +17,30 @@ "paramDescription": "the ES host", "paramRequired": true }, + { + "paramName": "esBatchWriteRetryCount", + "paramLongName": "esBatchWriteRetryCount", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esBatchWriteRetryWait", + "paramLongName": "esBatchWriteRetryWait", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esBatchSizeEntries", + "paramLongName": "esBatchSizeEntries", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esNodesWanOnly", + "paramLongName": "esNodesWanOnly", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, { "paramName": "broker", "paramLongName": "brokerApiBaseUrl", diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml index c73b3fb52..248326d57 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml @@ -36,6 +36,26 @@ esIndexHost the elasticsearch host + + esBatchWriteRetryCount + 8 + an ES configuration property + + + esBatchWriteRetryWait + 60s + an ES configuration property + + + esBatchSizeEntries + 200 + an ES configuration property + + + esNodesWanOnly + true + an ES configuration property + maxIndexedEventsForDsAndTopic the max number of events for each couple (ds/topic) @@ -125,6 +145,10 @@ --outputDir${outputDir} --index${esNotificationsIndexName} --esHost${esIndexHost} + --esBatchWriteRetryCount${esBatchWriteRetryCount} + --esBatchWriteRetryWait${esBatchWriteRetryWait} + --esBatchSizeEntries${esBatchSizeEntries} + --esNodesWanOnly${esNodesWanOnly} --brokerApiBaseUrl${brokerApiBaseUrl} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml index 43c26f3a3..9095004ad 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml @@ -13,6 +13,26 @@ esIndexHost the elasticsearch host + + esBatchWriteRetryCount + 8 + an ES configuration property + + + esBatchWriteRetryWait + 60s + an ES configuration property + + + esBatchSizeEntries + 200 + an ES configuration property + + + esNodesWanOnly + true + an ES configuration property + maxIndexedEventsForDsAndTopic the max number of events for each couple (ds/topic) @@ -102,6 +122,10 @@ --outputDir${outputDir} --index${esEventIndexName} --esHost${esIndexHost} + --esBatchWriteRetryCount${esBatchWriteRetryCount} + --esBatchWriteRetryWait${esBatchWriteRetryWait} + --esBatchSizeEntries${esBatchSizeEntries} + --esNodesWanOnly${esNodesWanOnly} --maxEventsForTopic${maxIndexedEventsForDsAndTopic} --brokerApiBaseUrl${brokerApiBaseUrl} -- 2.17.1 From 3e19cf7b4a214373c77ac2fd8cde52fceff0d478 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 14 Dec 2020 15:24:33 +0100 Subject: [PATCH 7/8] openaireId --- .../dhp/broker/oa/util/ConversionUtils.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index 053627a5f..085681488 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -74,7 +74,7 @@ public class ConversionUtils { } final OaBrokerRelatedDataset res = new OaBrokerRelatedDataset(); - res.setOpenaireId(d.getId()); + res.setOpenaireId(cleanOpenaireId(d.getId())); res.setOriginalId(first(d.getOriginalId())); res.setTitle(structPropValue(d.getTitle())); res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid)); @@ -89,7 +89,7 @@ public class ConversionUtils { } final OaBrokerRelatedPublication res = new OaBrokerRelatedPublication(); - res.setOpenaireId(p.getId()); + res.setOpenaireId(cleanOpenaireId(p.getId())); res.setOriginalId(first(p.getOriginalId())); res.setTitle(structPropValue(p.getTitle())); res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid)); @@ -106,7 +106,7 @@ public class ConversionUtils { final OaBrokerMainEntity res = new OaBrokerMainEntity(); - res.setOpenaireId(result.getId()); + res.setOpenaireId(cleanOpenaireId(result.getId())); res.setOriginalId(first(result.getOriginalId())); res.setTypology(classId(result.getResulttype())); res.setTitles(structPropList(result.getTitle())); @@ -129,6 +129,10 @@ public class ConversionUtils { return res; } + private static String cleanOpenaireId(final String id) { + return id.contains("|") ? StringUtils.substringAfter(id, "|") : id; + } + private static OaBrokerAuthor oafAuthorToBrokerAuthor(final Author author) { if (author == null) { return null; @@ -188,7 +192,7 @@ public class ConversionUtils { } final OaBrokerProject res = new OaBrokerProject(); - res.setOpenaireId(p.getId()); + res.setOpenaireId(cleanOpenaireId(p.getId())); res.setTitle(fieldValue(p.getTitle())); res.setAcronym(fieldValue(p.getAcronym())); res.setCode(fieldValue(p.getCode())); @@ -214,7 +218,7 @@ public class ConversionUtils { } final OaBrokerRelatedSoftware res = new OaBrokerRelatedSoftware(); - res.setOpenaireId(sw.getId()); + res.setOpenaireId(cleanOpenaireId(sw.getId())); res.setName(structPropValue(sw.getTitle())); res.setDescription(fieldValue(sw.getDescription())); res.setRepository(fieldValue(sw.getCodeRepositoryUrl())); @@ -230,7 +234,7 @@ public class ConversionUtils { final OaBrokerRelatedDatasource res = new OaBrokerRelatedDatasource(); res.setName(StringUtils.defaultIfBlank(fieldValue(ds.getOfficialname()), fieldValue(ds.getEnglishname()))); - res.setOpenaireId(ds.getId()); + res.setOpenaireId(cleanOpenaireId(ds.getId())); res.setType(classId(ds.getDatasourcetype())); return res; } -- 2.17.1 From 12fa5d122ae959842c9d28602144ffe7514eeeaa Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 15 Dec 2020 08:30:26 +0100 Subject: [PATCH 8/8] fixed a problem with join --- .../eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java | 2 +- .../dhp/broker/oa/PrepareRelatedDatasetsJob.java | 5 +++-- .../dhp/broker/oa/PrepareRelatedProjectsJob.java | 2 +- .../broker/oa/PrepareRelatedPublicationsJob.java | 5 +++-- .../dhp/broker/oa/PrepareRelatedSoftwaresJob.java | 2 +- .../eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java | 11 +++++++++++ .../dnetlib/dhp/broker/oa/util/ConversionUtils.java | 2 +- .../oa/util/DatasourceRelationsAccumulator.java | 13 +++++++++++-- 8 files changed, 32 insertions(+), 10 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java index a575328e8..dc156cbcf 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java @@ -63,7 +63,7 @@ public class PrepareGroupsJob { .readPath(spark, workingDir + "/joinedEntities_step4", OaBrokerMainEntity.class); final Dataset mergedRels = ClusterUtils - .readPath(spark, graphPath + "/relation", Relation.class) + .loadRelations(graphPath, spark) .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); final TypedColumn, ResultGroup> aggr = new ResultAggregator() diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java index 8c2879b38..9bdf32a64 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java @@ -62,7 +62,7 @@ public class PrepareRelatedDatasetsJob { .map(ConversionUtils::oafDatasetToBrokerDataset, Encoders.bean(OaBrokerRelatedDataset.class)); final Dataset rels = ClusterUtils - .readPath(spark, graphPath + "/relation", Relation.class) + .loadRelations(graphPath, spark) .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) .filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass())) @@ -72,7 +72,8 @@ public class PrepareRelatedDatasetsJob { final Dataset dataset = rels .joinWith(datasets, datasets.col("openaireId").equalTo(rels.col("target")), "inner") .map(t -> { - final RelatedDataset rel = new RelatedDataset(t._1.getSource(), t._2); + final RelatedDataset rel = new RelatedDataset(t._1.getSource(), + t._2); rel.getRelDataset().setRelType(t._1.getRelClass()); return rel; }, Encoders.bean(RelatedDataset.class)); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java index 1952b5d9c..9498c0f33 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java @@ -64,7 +64,7 @@ public class PrepareRelatedProjectsJob { .map(ConversionUtils::oafProjectToBrokerProject, Encoders.bean(OaBrokerProject.class)); final Dataset rels = ClusterUtils - .readPath(spark, graphPath + "/relation", Relation.class) + .loadRelations(graphPath, spark) .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT)) .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java index d6ac27fc5..8270500fd 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java @@ -65,7 +65,7 @@ public class PrepareRelatedPublicationsJob { Encoders.bean(OaBrokerRelatedPublication.class)); final Dataset rels = ClusterUtils - .readPath(spark, graphPath + "/relation", Relation.class) + .loadRelations(graphPath, spark) .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) .filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass())) @@ -75,7 +75,8 @@ public class PrepareRelatedPublicationsJob { final Dataset dataset = rels .joinWith(pubs, pubs.col("openaireId").equalTo(rels.col("target")), "inner") .map(t -> { - final RelatedPublication rel = new RelatedPublication(t._1.getSource(), t._2); + final RelatedPublication rel = new RelatedPublication( + t._1.getSource(), t._2); rel.getRelPublication().setRelType(t._1.getRelClass()); return rel; }, Encoders.bean(RelatedPublication.class)); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java index c4ccf4af6..16b450733 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java @@ -64,7 +64,7 @@ public class PrepareRelatedSoftwaresJob { .map(ConversionUtils::oafSoftwareToBrokerSoftware, Encoders.bean(OaBrokerRelatedSoftware.class)); final Dataset rels = ClusterUtils - .readPath(spark, graphPath + "/relation", Relation.class) + .loadRelations(graphPath, spark) .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java index 04985a6ab..9ce64f6bd 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.oaf.Relation; public class ClusterUtils { @@ -30,6 +31,16 @@ public class ClusterUtils { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } + public static Dataset loadRelations(final String graphPath, final SparkSession spark) { + return ClusterUtils + .readPath(spark, graphPath + "/relation", Relation.class) + .map(r -> { + r.setSource(ConversionUtils.cleanOpenaireId(r.getSource())); + r.setTarget(ConversionUtils.cleanOpenaireId(r.getTarget())); + return r; + }, Encoders.bean(Relation.class)); + } + public static Dataset readPath( final SparkSession spark, final String inputPath, diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index 085681488..ecbfd821e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -129,7 +129,7 @@ public class ConversionUtils { return res; } - private static String cleanOpenaireId(final String id) { + public static String cleanOpenaireId(final String id) { return id.contains("|") ? StringUtils.substringAfter(id, "|") : id; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/DatasourceRelationsAccumulator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/DatasourceRelationsAccumulator.java index 75c4625ce..c693be93c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/DatasourceRelationsAccumulator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/DatasourceRelationsAccumulator.java @@ -59,9 +59,18 @@ public class DatasourceRelationsAccumulator implements Serializable { final DatasourceRelationsAccumulator res = new DatasourceRelationsAccumulator(); collectedFromSet .stream() - .map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.COLLECTED_FROM_REL)) + .map( + s -> new Tuple3<>(ConversionUtils.cleanOpenaireId(r.getId()), ConversionUtils.cleanOpenaireId(s), + BrokerConstants.COLLECTED_FROM_REL)) .forEach(res::addTuple); - hostedBySet.stream().map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.HOSTED_BY_REL)).forEach(res::addTuple); + + hostedBySet + .stream() + .map( + s -> new Tuple3<>(ConversionUtils.cleanOpenaireId(r.getId()), ConversionUtils.cleanOpenaireId(s), + BrokerConstants.HOSTED_BY_REL)) + .forEach(res::addTuple); + return res; } -- 2.17.1