From 10f3f7eca7c4339315b08458ccb195ed2992e019 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 10 Dec 2020 14:47:22 +0100 Subject: [PATCH] workingDir and outputDir --- .../dhp/broker/oa/CheckDuplictedIdsJob.java | 6 +-- .../dhp/broker/oa/GenerateEventsJob.java | 19 ++++------ .../dhp/broker/oa/GenerateStatsJob.java | 7 ++-- .../dhp/broker/oa/IndexEventSubsetJob.java | 7 ++-- .../dhp/broker/oa/IndexNotificationsJob.java | 19 ++++------ .../dnetlib/dhp/broker/oa/IndexOnESJob.java | 7 ++-- .../dnetlib/dhp/broker/oa/JoinStep0Job.java | 10 ++--- .../dnetlib/dhp/broker/oa/JoinStep1Job.java | 10 ++--- .../dnetlib/dhp/broker/oa/JoinStep2Job.java | 10 ++--- .../dnetlib/dhp/broker/oa/JoinStep3Job.java | 10 ++--- .../dnetlib/dhp/broker/oa/JoinStep4Job.java | 10 ++--- .../broker/oa/PartitionEventsByDsIdJob.java | 4 +- .../dhp/broker/oa/PrepareGroupsJob.java | 8 ++-- .../broker/oa/PrepareRelatedDatasetsJob.java | 6 +-- .../oa/PrepareRelatedDatasourcesJob.java | 6 +-- .../broker/oa/PrepareRelatedProjectsJob.java | 6 +-- .../oa/PrepareRelatedPublicationsJob.java | 6 +-- .../broker/oa/PrepareRelatedSoftwaresJob.java | 6 +-- .../broker/oa/PrepareSimpleEntititiesJob.java | 6 +-- .../dhp/broker/oa/check_duplicates.json | 9 +++++ .../dnetlib/dhp/broker/oa/common_params.json | 2 +- .../oa/generate_all/oozie_app/workflow.xml | 37 ++++++++++--------- .../dhp/broker/oa/generate_events.json | 8 +++- .../eu/dnetlib/dhp/broker/oa/index_es.json | 4 +- .../dhp/broker/oa/index_event_subset.json | 4 +- .../dhp/broker/oa/index_notifications.json | 4 +- .../notifications_only/oozie_app/workflow.xml | 6 +-- .../dhp/broker/oa/od_partitions_params.json | 4 +- .../opendoarPartition/oozie_app/workflow.xml | 4 +- .../dnetlib/dhp/broker/oa/stats_params.json | 6 +-- 30 files changed, 128 insertions(+), 123 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/check_duplicates.json diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java index 951afb6c5e..416ee5024f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java @@ -31,15 +31,15 @@ public class CheckDuplictedIdsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString(CheckDuplictedIdsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/check_duplicates.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("workingPath") + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); - final String countPath = parser.get("workingPath") + "/counts"; + final String countPath = parser.get("outputDir") + "/counts"; log.info("countPath: {}", countPath); final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java index cfee360c57..9e128d49e5 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java @@ -33,9 +33,8 @@ public class GenerateEventsJob { public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - GenerateEventsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_events.json"))); + .toString(GenerateEventsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_events.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -44,10 +43,10 @@ public class GenerateEventsJob { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String eventsPath = workingPath + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); final Set dsIdWhitelist = ClusterUtils.parseParamAsList(parser, "datasourceIdWhitelist"); @@ -70,13 +69,11 @@ public class GenerateEventsJob { final LongAccumulator total = spark.sparkContext().longAccumulator("total_events"); final Dataset groups = ClusterUtils - .readPath(spark, workingPath + "/duplicates", ResultGroup.class); + .readPath(spark, workingDir + "/duplicates", ResultGroup.class); final Dataset dataset = groups - .map( - g -> EventFinder - .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, accumulators), - Encoders + .map(g -> EventFinder + .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, accumulators), Encoders .bean(EventGroup.class)) .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class)); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java index d5c53ea360..c44aead5e9 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java @@ -33,9 +33,8 @@ public class GenerateStatsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - GenerateStatsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/stats_params.json"))); + .toString(GenerateStatsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/stats_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -46,7 +45,7 @@ public class GenerateStatsJob { final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("workingPath") + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); final String dbUrl = parser.get("dbUrl"); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java index d3cbe0034f..929c316004 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java @@ -39,14 +39,13 @@ public class IndexEventSubsetJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - IndexEventSubsetJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_event_subset.json"))); + .toString(IndexEventSubsetJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_event_subset.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("workingPath") + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); final String index = parser.get("index"); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index 792a2354a4..e1d2e7d245 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -47,14 +47,13 @@ public class IndexNotificationsJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - IndexNotificationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); + .toString(IndexNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("workingPath") + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); final String index = parser.get("index"); @@ -117,8 +116,7 @@ public class IndexNotificationsJob { final long date) { final List list = subscriptions .stream() - .filter( - s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) + .filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) .map(s -> generateNotification(s, e, date)) .collect(Collectors.toList()); @@ -149,18 +147,15 @@ public class IndexNotificationsJob { if (conditions.containsKey("trust") && !SubscriptionUtils - .verifyFloatRange( - map.getTrust(), conditions.get("trust").get(0).getValue(), - conditions.get("trust").get(0).getOtherValue())) { + .verifyFloatRange(map.getTrust(), conditions.get("trust").get(0).getValue(), conditions.get("trust").get(0).getOtherValue())) { return false; } if (conditions.containsKey("targetDateofacceptance") && !conditions .get("targetDateofacceptance") .stream() - .anyMatch( - c -> SubscriptionUtils - .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { + .anyMatch(c -> SubscriptionUtils + .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { return false; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java index 762bfbb903..106528666c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java @@ -29,14 +29,13 @@ public class IndexOnESJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString( - IndexOnESJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_es.json"))); + .toString(IndexOnESJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_es.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("workingPath") + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); final String index = parser.get("index"); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java index 39fa76e43d..01778ad74f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java @@ -42,10 +42,10 @@ public class JoinStep0Job { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String joinedEntitiesPath = workingPath + "/joinedEntities_step0"; + final String joinedEntitiesPath = workingDir + "/joinedEntities_step0"; log.info("joinedEntitiesPath: {}", joinedEntitiesPath); final SparkConf conf = new SparkConf(); @@ -57,10 +57,10 @@ public class JoinStep0Job { final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final Dataset sources = ClusterUtils - .readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class); + .readPath(spark, workingDir + "/simpleEntities", OaBrokerMainEntity.class); final Dataset typedRels = ClusterUtils - .readPath(spark, workingPath + "/relatedDatasources", RelatedDatasource.class); + .readPath(spark, workingDir + "/relatedDatasources", RelatedDatasource.class); final TypedColumn, OaBrokerMainEntity> aggr = new RelatedDatasourceAggregator() .toColumn(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java index 8e502f7366..82c3619e10 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java @@ -40,10 +40,10 @@ public class JoinStep1Job { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String joinedEntitiesPath = workingPath + "/joinedEntities_step1"; + final String joinedEntitiesPath = workingDir + "/joinedEntities_step1"; log.info("joinedEntitiesPath: {}", joinedEntitiesPath); final SparkConf conf = new SparkConf(); @@ -55,10 +55,10 @@ public class JoinStep1Job { final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final Dataset sources = ClusterUtils - .readPath(spark, workingPath + "/joinedEntities_step0", OaBrokerMainEntity.class); + .readPath(spark, workingDir + "/joinedEntities_step0", OaBrokerMainEntity.class); final Dataset typedRels = ClusterUtils - .readPath(spark, workingPath + "/relatedProjects", RelatedProject.class); + .readPath(spark, workingDir + "/relatedProjects", RelatedProject.class); final TypedColumn, OaBrokerMainEntity> aggr = new RelatedProjectAggregator() .toColumn(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java index 55ab497f07..bd6135d413 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java @@ -39,10 +39,10 @@ public class JoinStep2Job { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String joinedEntitiesPath = workingPath + "/joinedEntities_step2"; + final String joinedEntitiesPath = workingDir + "/joinedEntities_step2"; log.info("joinedEntitiesPath: {}", joinedEntitiesPath); final SparkConf conf = new SparkConf(); @@ -54,10 +54,10 @@ public class JoinStep2Job { final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final Dataset sources = ClusterUtils - .readPath(spark, workingPath + "/joinedEntities_step1", OaBrokerMainEntity.class); + .readPath(spark, workingDir + "/joinedEntities_step1", OaBrokerMainEntity.class); final Dataset typedRels = ClusterUtils - .readPath(spark, workingPath + "/relatedSoftwares", RelatedSoftware.class); + .readPath(spark, workingDir + "/relatedSoftwares", RelatedSoftware.class); final TypedColumn, OaBrokerMainEntity> aggr = new RelatedSoftwareAggregator() .toColumn(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java index 4d06f6f133..18e8c00b2a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java @@ -40,10 +40,10 @@ public class JoinStep3Job { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String joinedEntitiesPath = workingPath + "/joinedEntities_step3"; + final String joinedEntitiesPath = workingDir + "/joinedEntities_step3"; log.info("joinedEntitiesPath: {}", joinedEntitiesPath); final SparkConf conf = new SparkConf(); @@ -55,10 +55,10 @@ public class JoinStep3Job { final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final Dataset sources = ClusterUtils - .readPath(spark, workingPath + "/joinedEntities_step2", OaBrokerMainEntity.class); + .readPath(spark, workingDir + "/joinedEntities_step2", OaBrokerMainEntity.class); final Dataset typedRels = ClusterUtils - .readPath(spark, workingPath + "/relatedDatasets", RelatedDataset.class); + .readPath(spark, workingDir + "/relatedDatasets", RelatedDataset.class); final TypedColumn, OaBrokerMainEntity> aggr = new RelatedDatasetAggregator() .toColumn(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java index b53d7e39b9..9655303620 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java @@ -40,10 +40,10 @@ public class JoinStep4Job { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String joinedEntitiesPath = workingPath + "/joinedEntities_step4"; + final String joinedEntitiesPath = workingDir + "/joinedEntities_step4"; log.info("joinedEntitiesPath: {}", joinedEntitiesPath); final SparkConf conf = new SparkConf(); @@ -55,10 +55,10 @@ public class JoinStep4Job { final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final Dataset sources = ClusterUtils - .readPath(spark, workingPath + "/joinedEntities_step3", OaBrokerMainEntity.class); + .readPath(spark, workingDir + "/joinedEntities_step3", OaBrokerMainEntity.class); final Dataset typedRels = ClusterUtils - .readPath(spark, workingPath + "/relatedPublications", RelatedPublication.class); + .readPath(spark, workingDir + "/relatedPublications", RelatedPublication.class); final TypedColumn, OaBrokerMainEntity> aggr = new RelatedPublicationAggregator() .toColumn(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java index 1e060d824f..ec694f5d36 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java @@ -54,10 +54,10 @@ public class PartitionEventsByDsIdJob { final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("workingPath") + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); - final String partitionPath = parser.get("workingPath") + "/eventsByOpendoarId"; + final String partitionPath = parser.get("outputDir") + "/eventsByOpendoarId"; log.info("partitionPath: {}", partitionPath); final String opendoarIds = parser.get("opendoarIds"); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java index eb9add00de..a575328e8d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java @@ -45,10 +45,10 @@ public class PrepareGroupsJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String groupsPath = workingPath + "/duplicates"; + final String groupsPath = workingDir + "/duplicates"; log.info("groupsPath: {}", groupsPath); final SparkConf conf = new SparkConf(); @@ -60,7 +60,7 @@ public class PrepareGroupsJob { final LongAccumulator total = spark.sparkContext().longAccumulator("total_groups"); final Dataset results = ClusterUtils - .readPath(spark, workingPath + "/joinedEntities_step4", OaBrokerMainEntity.class); + .readPath(spark, workingDir + "/joinedEntities_step4", OaBrokerMainEntity.class); final Dataset mergedRels = ClusterUtils .readPath(spark, graphPath + "/relation", Relation.class) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java index 0cfc1adcbb..8c2879b38d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java @@ -42,10 +42,10 @@ public class PrepareRelatedDatasetsJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String relsPath = workingPath + "/relatedDatasets"; + final String relsPath = workingDir + "/relatedDatasets"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java index 166372a7f4..0c23181274 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java @@ -48,10 +48,10 @@ public class PrepareRelatedDatasourcesJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String relsPath = workingPath + "/relatedDatasources"; + final String relsPath = workingDir + "/relatedDatasources"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java index e988366c8c..1952b5d9c9 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java @@ -44,10 +44,10 @@ public class PrepareRelatedProjectsJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String relsPath = workingPath + "/relatedProjects"; + final String relsPath = workingDir + "/relatedProjects"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java index 724acc4dce..d6ac27fc51 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java @@ -43,10 +43,10 @@ public class PrepareRelatedPublicationsJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String relsPath = workingPath + "/relatedPublications"; + final String relsPath = workingDir + "/relatedPublications"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java index d15565d0d7..c4ccf4af68 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java @@ -44,10 +44,10 @@ public class PrepareRelatedSoftwaresJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String relsPath = workingPath + "/relatedSoftwares"; + final String relsPath = workingDir + "/relatedSoftwares"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java index d3c7113ec0..cf44506038 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java @@ -44,10 +44,10 @@ public class PrepareSimpleEntititiesJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String simpleEntitiesPath = workingPath + "/simpleEntities"; + final String simpleEntitiesPath = workingDir + "/simpleEntities"; log.info("simpleEntitiesPath: {}", simpleEntitiesPath); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/check_duplicates.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/check_duplicates.json new file mode 100644 index 0000000000..2584b78fcc --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/check_duplicates.json @@ -0,0 +1,9 @@ +[ + + { + "paramName": "o", + "paramLongName": "outputDir", + "paramDescription": "the path where the data are stored", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/common_params.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/common_params.json index adee1888a1..0d942cd59f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/common_params.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/common_params.json @@ -7,7 +7,7 @@ }, { "paramName": "o", - "paramLongName": "workingPath", + "paramLongName": "workingDir", "paramDescription": "the path where the temporary data will be stored", "paramRequired": true } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 14e33b0916..1a1df418b8 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -6,7 +6,7 @@ the path where the graph is stored - workingPath + outputDir the path where the the generated data will be stored @@ -119,7 +119,7 @@ - + @@ -152,7 +152,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -176,7 +176,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -201,7 +201,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -225,7 +225,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -249,7 +249,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -273,7 +273,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -299,7 +299,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -323,7 +323,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -347,7 +347,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -371,7 +371,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -395,7 +395,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -419,7 +419,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -442,7 +442,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --workingPath${workingPath} + --workingDir${workingDir} + --outputDir${outputDir} --datasourceIdWhitelist${datasourceIdWhitelist} --datasourceTypeWhitelist${datasourceTypeWhitelist} --datasourceIdBlacklist${datasourceIdBlacklist} @@ -468,7 +469,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --workingPath${workingPath} + --outputDir${outputDir} --index${esEventIndexName} --esHost${esIndexHost} --maxEventsForTopic${maxIndexedEventsForDsAndTopic} @@ -495,7 +496,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --workingPath${workingPath} + --outputDir${outputDir} --index${esNotificationsIndexName} --esHost${esIndexHost} --brokerApiBaseUrl${brokerApiBaseUrl} @@ -521,7 +522,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --workingPath${workingPath} + --outputDir${outputDir} --dbUrl${brokerDbUrl} --dbUser${brokerDbUser} --dbPassword${brokerDbPassword} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json index bab8081932..a482ee2dc6 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json @@ -1,7 +1,13 @@ [ + { + "paramName": "wp", + "paramLongName": "workingDir", + "paramDescription": "the path where the temporary data are stored", + "paramRequired": true + }, { "paramName": "o", - "paramLongName": "workingPath", + "paramLongName": "outputDir", "paramDescription": "the path where the generated events will be stored", "paramRequired": true }, diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json index ac1dbf7867..079709ad89 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json @@ -1,8 +1,8 @@ [ { "paramName": "o", - "paramLongName": "workingPath", - "paramDescription": "the workinh path", + "paramLongName": "outputDir", + "paramDescription": "the data path", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json index 4921bc03e2..441249661e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json @@ -1,8 +1,8 @@ [ { "paramName": "o", - "paramLongName": "workingPath", - "paramDescription": "the workinh path", + "paramLongName": "outputDir", + "paramDescription": "the path where the generated data are stored", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json index 5eea894c8e..63e9b12637 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json @@ -1,8 +1,8 @@ [ { "paramName": "o", - "paramLongName": "workingPath", - "paramDescription": "the workinh path", + "paramLongName": "outputDir", + "paramDescription": "the dir that contains the events folder", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml index 879c0d3490..c73b3fb524 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml @@ -6,8 +6,8 @@ the path where the graph is stored - workingPath - the path where the the generated data will be stored + outputDir + the path where the the generated data are stored datasourceIdWhitelist @@ -122,7 +122,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --workingPath${workingPath} + --outputDir${outputDir} --index${esNotificationsIndexName} --esHost${esIndexHost} --brokerApiBaseUrl${brokerApiBaseUrl} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/od_partitions_params.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/od_partitions_params.json index 10ba926abf..12cd6a391c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/od_partitions_params.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/od_partitions_params.json @@ -1,8 +1,8 @@ [ { "paramName": "o", - "paramLongName": "workingPath", - "paramDescription": "the path where the temporary data will be stored", + "paramLongName": "outputDir", + "paramDescription": "the path where the data will be stored", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/opendoarPartition/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/opendoarPartition/oozie_app/workflow.xml index dba3c9f73b..83fe47c758 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/opendoarPartition/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/opendoarPartition/oozie_app/workflow.xml @@ -6,7 +6,7 @@ the opendoar IDs whitelist (comma separated) - workingPath + outputDir the path where the the generated data will be stored @@ -87,7 +87,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --workingPath${workingPath} + --workingDir${workingDir} --opendoarIds${opendoarIds} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json index 15d7d251f6..2388b1c1f4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json @@ -1,8 +1,8 @@ [ { - "paramName": "wp", - "paramLongName": "workingPath", - "paramDescription": "the working path", + "paramName": "o", + "paramLongName": "outputDir", + "paramDescription": "the path where generated data are stored", "paramRequired": true }, {