diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java index 5ca865e8fc..d42c692f7b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/CheckDuplictedIdsJob.java @@ -32,15 +32,15 @@ public class CheckDuplictedIdsJob { IOUtils .toString( CheckDuplictedIdsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/check_duplicates.json"))); parser.parseArgument(args); final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("workingPath") + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); - final String countPath = parser.get("workingPath") + "/counts"; + final String countPath = parser.get("outputDir") + "/counts"; log.info("countPath: {}", countPath); final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); @@ -59,6 +59,7 @@ public class CheckDuplictedIdsJob { .map(o -> ClusterUtils.incrementAccumulator(o, total), Encoders.tuple(Encoders.STRING(), Encoders.LONG())) .write() .mode(SaveMode.Overwrite) + .option("compression", "gzip") .json(countPath); ; diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java index cfee360c57..1ae241e340 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java @@ -44,10 +44,10 @@ public class GenerateEventsJob { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String eventsPath = workingPath + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); final Set dsIdWhitelist = ClusterUtils.parseParamAsList(parser, "datasourceIdWhitelist"); @@ -59,6 +59,9 @@ public class GenerateEventsJob { final Set dsIdBlacklist = ClusterUtils.parseParamAsList(parser, "datasourceIdBlacklist"); log.info("datasourceIdBlacklist: {}", StringUtils.join(dsIdBlacklist, ",")); + final Set topicWhitelist = ClusterUtils.parseParamAsList(parser, "topicWhitelist"); + log.info("topicWhitelist: {}", StringUtils.join(topicWhitelist, ",")); + final SparkConf conf = new SparkConf(); runWithSparkSession(conf, isSparkSessionManaged, spark -> { @@ -70,12 +73,12 @@ public class GenerateEventsJob { final LongAccumulator total = spark.sparkContext().longAccumulator("total_events"); final Dataset groups = ClusterUtils - .readPath(spark, workingPath + "/duplicates", ResultGroup.class); + .readPath(spark, workingDir + "/duplicates", ResultGroup.class); final Dataset dataset = groups .map( g -> EventFinder - .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, accumulators), + .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, topicWhitelist, accumulators), Encoders .bean(EventGroup.class)) .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class)); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java index d5c53ea360..2772f8fd16 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java @@ -46,7 +46,7 @@ public class GenerateStatsJob { final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("workingPath") + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); final String dbUrl = parser.get("dbUrl"); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java index d3cbe0034f..e18a7ef560 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java @@ -46,7 +46,7 @@ public class IndexEventSubsetJob { final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("workingPath") + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); final String index = parser.get("index"); @@ -55,6 +55,18 @@ public class IndexEventSubsetJob { final String indexHost = parser.get("esHost"); log.info("indexHost: {}", indexHost); + final String esBatchWriteRetryCount = parser.get("esBatchWriteRetryCount"); + log.info("esBatchWriteRetryCount: {}", esBatchWriteRetryCount); + + final String esBatchWriteRetryWait = parser.get("esBatchWriteRetryWait"); + log.info("esBatchWriteRetryWait: {}", esBatchWriteRetryWait); + + final String esBatchSizeEntries = parser.get("esBatchSizeEntries"); + log.info("esBatchSizeEntries: {}", esBatchSizeEntries); + + final String esNodesWanOnly = parser.get("esNodesWanOnly"); + log.info("esNodesWanOnly: {}", esNodesWanOnly); + final int maxEventsForTopic = NumberUtils.toInt(parser.get("maxEventsForTopic")); log.info("maxEventsForTopic: {}", maxEventsForTopic); @@ -86,10 +98,10 @@ public class IndexEventSubsetJob { esCfg.put("es.index.auto.create", "false"); esCfg.put("es.nodes", indexHost); esCfg.put("es.mapping.id", "eventId"); // THE PRIMARY KEY - esCfg.put("es.batch.write.retry.count", "8"); - esCfg.put("es.batch.write.retry.wait", "60s"); - esCfg.put("es.batch.size.entries", "200"); - esCfg.put("es.nodes.wan.only", "true"); + esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount); + esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait); + esCfg.put("es.batch.size.entries", esBatchSizeEntries); + esCfg.put("es.nodes.wan.only", esNodesWanOnly); log.info("*** Start indexing"); JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index 792a2354a4..75f4eb066d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -54,7 +54,7 @@ public class IndexNotificationsJob { final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("workingPath") + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); final String index = parser.get("index"); @@ -63,6 +63,18 @@ public class IndexNotificationsJob { final String indexHost = parser.get("esHost"); log.info("indexHost: {}", indexHost); + final String esBatchWriteRetryCount = parser.get("esBatchWriteRetryCount"); + log.info("esBatchWriteRetryCount: {}", esBatchWriteRetryCount); + + final String esBatchWriteRetryWait = parser.get("esBatchWriteRetryWait"); + log.info("esBatchWriteRetryWait: {}", esBatchWriteRetryWait); + + final String esBatchSizeEntries = parser.get("esBatchSizeEntries"); + log.info("esBatchSizeEntries: {}", esBatchSizeEntries); + + final String esNodesWanOnly = parser.get("esNodesWanOnly"); + log.info("esNodesWanOnly: {}", esNodesWanOnly); + final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl"); log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl); @@ -92,10 +104,10 @@ public class IndexNotificationsJob { esCfg.put("es.index.auto.create", "false"); esCfg.put("es.nodes", indexHost); esCfg.put("es.mapping.id", "notificationId"); // THE PRIMARY KEY - esCfg.put("es.batch.write.retry.count", "8"); - esCfg.put("es.batch.write.retry.wait", "60s"); - esCfg.put("es.batch.size.entries", "200"); - esCfg.put("es.nodes.wan.only", "true"); + esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount); + esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait); + esCfg.put("es.batch.size.entries", esBatchSizeEntries); + esCfg.put("es.nodes.wan.only", esNodesWanOnly); log.info("*** Start indexing"); JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java index 762bfbb903..380a689e4d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java @@ -36,7 +36,7 @@ public class IndexOnESJob { final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("workingPath") + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); final String index = parser.get("index"); @@ -45,6 +45,18 @@ public class IndexOnESJob { final String indexHost = parser.get("esHost"); log.info("indexHost: {}", indexHost); + final String esBatchWriteRetryCount = parser.get("esBatchWriteRetryCount"); + log.info("esBatchWriteRetryCount: {}", esBatchWriteRetryCount); + + final String esBatchWriteRetryWait = parser.get("esBatchWriteRetryWait"); + log.info("esBatchWriteRetryWait: {}", esBatchWriteRetryWait); + + final String esBatchSizeEntries = parser.get("esBatchSizeEntries"); + log.info("esBatchSizeEntries: {}", esBatchSizeEntries); + + final String esNodesWanOnly = parser.get("esNodesWanOnly"); + log.info("esNodesWanOnly: {}", esNodesWanOnly); + final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); final JavaRDD inputRdd = ClusterUtils @@ -53,15 +65,13 @@ public class IndexOnESJob { .javaRDD(); final Map esCfg = new HashMap<>(); - // esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54"); - esCfg.put("es.index.auto.create", "false"); esCfg.put("es.nodes", indexHost); esCfg.put("es.mapping.id", "eventId"); // THE PRIMARY KEY - esCfg.put("es.batch.write.retry.count", "8"); - esCfg.put("es.batch.write.retry.wait", "60s"); - esCfg.put("es.batch.size.entries", "200"); - esCfg.put("es.nodes.wan.only", "true"); + esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount); + esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait); + esCfg.put("es.batch.size.entries", esBatchSizeEntries); + esCfg.put("es.nodes.wan.only", esNodesWanOnly); JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java index 39fa76e43d..01778ad74f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep0Job.java @@ -42,10 +42,10 @@ public class JoinStep0Job { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String joinedEntitiesPath = workingPath + "/joinedEntities_step0"; + final String joinedEntitiesPath = workingDir + "/joinedEntities_step0"; log.info("joinedEntitiesPath: {}", joinedEntitiesPath); final SparkConf conf = new SparkConf(); @@ -57,10 +57,10 @@ public class JoinStep0Job { final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final Dataset sources = ClusterUtils - .readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class); + .readPath(spark, workingDir + "/simpleEntities", OaBrokerMainEntity.class); final Dataset typedRels = ClusterUtils - .readPath(spark, workingPath + "/relatedDatasources", RelatedDatasource.class); + .readPath(spark, workingDir + "/relatedDatasources", RelatedDatasource.class); final TypedColumn, OaBrokerMainEntity> aggr = new RelatedDatasourceAggregator() .toColumn(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java index 8e502f7366..82c3619e10 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep1Job.java @@ -40,10 +40,10 @@ public class JoinStep1Job { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String joinedEntitiesPath = workingPath + "/joinedEntities_step1"; + final String joinedEntitiesPath = workingDir + "/joinedEntities_step1"; log.info("joinedEntitiesPath: {}", joinedEntitiesPath); final SparkConf conf = new SparkConf(); @@ -55,10 +55,10 @@ public class JoinStep1Job { final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final Dataset sources = ClusterUtils - .readPath(spark, workingPath + "/joinedEntities_step0", OaBrokerMainEntity.class); + .readPath(spark, workingDir + "/joinedEntities_step0", OaBrokerMainEntity.class); final Dataset typedRels = ClusterUtils - .readPath(spark, workingPath + "/relatedProjects", RelatedProject.class); + .readPath(spark, workingDir + "/relatedProjects", RelatedProject.class); final TypedColumn, OaBrokerMainEntity> aggr = new RelatedProjectAggregator() .toColumn(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java index 55ab497f07..bd6135d413 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java @@ -39,10 +39,10 @@ public class JoinStep2Job { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String joinedEntitiesPath = workingPath + "/joinedEntities_step2"; + final String joinedEntitiesPath = workingDir + "/joinedEntities_step2"; log.info("joinedEntitiesPath: {}", joinedEntitiesPath); final SparkConf conf = new SparkConf(); @@ -54,10 +54,10 @@ public class JoinStep2Job { final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final Dataset sources = ClusterUtils - .readPath(spark, workingPath + "/joinedEntities_step1", OaBrokerMainEntity.class); + .readPath(spark, workingDir + "/joinedEntities_step1", OaBrokerMainEntity.class); final Dataset typedRels = ClusterUtils - .readPath(spark, workingPath + "/relatedSoftwares", RelatedSoftware.class); + .readPath(spark, workingDir + "/relatedSoftwares", RelatedSoftware.class); final TypedColumn, OaBrokerMainEntity> aggr = new RelatedSoftwareAggregator() .toColumn(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java index 4d06f6f133..18e8c00b2a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep3Job.java @@ -40,10 +40,10 @@ public class JoinStep3Job { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String joinedEntitiesPath = workingPath + "/joinedEntities_step3"; + final String joinedEntitiesPath = workingDir + "/joinedEntities_step3"; log.info("joinedEntitiesPath: {}", joinedEntitiesPath); final SparkConf conf = new SparkConf(); @@ -55,10 +55,10 @@ public class JoinStep3Job { final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final Dataset sources = ClusterUtils - .readPath(spark, workingPath + "/joinedEntities_step2", OaBrokerMainEntity.class); + .readPath(spark, workingDir + "/joinedEntities_step2", OaBrokerMainEntity.class); final Dataset typedRels = ClusterUtils - .readPath(spark, workingPath + "/relatedDatasets", RelatedDataset.class); + .readPath(spark, workingDir + "/relatedDatasets", RelatedDataset.class); final TypedColumn, OaBrokerMainEntity> aggr = new RelatedDatasetAggregator() .toColumn(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java index b53d7e39b9..9655303620 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep4Job.java @@ -40,10 +40,10 @@ public class JoinStep4Job { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String joinedEntitiesPath = workingPath + "/joinedEntities_step4"; + final String joinedEntitiesPath = workingDir + "/joinedEntities_step4"; log.info("joinedEntitiesPath: {}", joinedEntitiesPath); final SparkConf conf = new SparkConf(); @@ -55,10 +55,10 @@ public class JoinStep4Job { final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final Dataset sources = ClusterUtils - .readPath(spark, workingPath + "/joinedEntities_step3", OaBrokerMainEntity.class); + .readPath(spark, workingDir + "/joinedEntities_step3", OaBrokerMainEntity.class); final Dataset typedRels = ClusterUtils - .readPath(spark, workingPath + "/relatedPublications", RelatedPublication.class); + .readPath(spark, workingDir + "/relatedPublications", RelatedPublication.class); final TypedColumn, OaBrokerMainEntity> aggr = new RelatedPublicationAggregator() .toColumn(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java index e9644122f3..08c74a291b 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java @@ -55,10 +55,10 @@ public class PartitionEventsByDsIdJob { final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("workingPath") + "/events"; + final String eventsPath = parser.get("outputDir") + "/events"; log.info("eventsPath: {}", eventsPath); - final String partitionPath = parser.get("workingPath") + "/eventsByOpendoarId"; + final String partitionPath = parser.get("outputDir") + "/eventsByOpendoarId"; log.info("partitionPath: {}", partitionPath); final String opendoarIds = parser.get("opendoarIds"); @@ -91,6 +91,7 @@ public class PartitionEventsByDsIdJob { .write() .partitionBy("group") .mode(SaveMode.Overwrite) + .option("compression", "gzip") .json(partitionPath); }); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java index eb9add00de..dc156cbcf9 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareGroupsJob.java @@ -45,10 +45,10 @@ public class PrepareGroupsJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String groupsPath = workingPath + "/duplicates"; + final String groupsPath = workingDir + "/duplicates"; log.info("groupsPath: {}", groupsPath); final SparkConf conf = new SparkConf(); @@ -60,10 +60,10 @@ public class PrepareGroupsJob { final LongAccumulator total = spark.sparkContext().longAccumulator("total_groups"); final Dataset results = ClusterUtils - .readPath(spark, workingPath + "/joinedEntities_step4", OaBrokerMainEntity.class); + .readPath(spark, workingDir + "/joinedEntities_step4", OaBrokerMainEntity.class); final Dataset mergedRels = ClusterUtils - .readPath(spark, graphPath + "/relation", Relation.class) + .loadRelations(graphPath, spark) .filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)); final TypedColumn, ResultGroup> aggr = new ResultAggregator() diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java index 0cfc1adcbb..9bdf32a642 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasetsJob.java @@ -42,10 +42,10 @@ public class PrepareRelatedDatasetsJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String relsPath = workingPath + "/relatedDatasets"; + final String relsPath = workingDir + "/relatedDatasets"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); @@ -62,7 +62,7 @@ public class PrepareRelatedDatasetsJob { .map(ConversionUtils::oafDatasetToBrokerDataset, Encoders.bean(OaBrokerRelatedDataset.class)); final Dataset rels = ClusterUtils - .readPath(spark, graphPath + "/relation", Relation.class) + .loadRelations(graphPath, spark) .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) .filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass())) @@ -72,7 +72,8 @@ public class PrepareRelatedDatasetsJob { final Dataset dataset = rels .joinWith(datasets, datasets.col("openaireId").equalTo(rels.col("target")), "inner") .map(t -> { - final RelatedDataset rel = new RelatedDataset(t._1.getSource(), t._2); + final RelatedDataset rel = new RelatedDataset(t._1.getSource(), + t._2); rel.getRelDataset().setRelType(t._1.getRelClass()); return rel; }, Encoders.bean(RelatedDataset.class)); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java index 166372a7f4..0c23181274 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedDatasourcesJob.java @@ -48,10 +48,10 @@ public class PrepareRelatedDatasourcesJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String relsPath = workingPath + "/relatedDatasources"; + final String relsPath = workingDir + "/relatedDatasources"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java index e988366c8c..9498c0f337 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedProjectsJob.java @@ -44,10 +44,10 @@ public class PrepareRelatedProjectsJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String relsPath = workingPath + "/relatedProjects"; + final String relsPath = workingDir + "/relatedProjects"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); @@ -64,7 +64,7 @@ public class PrepareRelatedProjectsJob { .map(ConversionUtils::oafProjectToBrokerProject, Encoders.bean(OaBrokerProject.class)); final Dataset rels = ClusterUtils - .readPath(spark, graphPath + "/relation", Relation.class) + .loadRelations(graphPath, spark) .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT)) .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java index 724acc4dce..8270500fd4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedPublicationsJob.java @@ -43,10 +43,10 @@ public class PrepareRelatedPublicationsJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String relsPath = workingPath + "/relatedPublications"; + final String relsPath = workingDir + "/relatedPublications"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); @@ -65,7 +65,7 @@ public class PrepareRelatedPublicationsJob { Encoders.bean(OaBrokerRelatedPublication.class)); final Dataset rels = ClusterUtils - .readPath(spark, graphPath + "/relation", Relation.class) + .loadRelations(graphPath, spark) .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) .filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass())) @@ -75,7 +75,8 @@ public class PrepareRelatedPublicationsJob { final Dataset dataset = rels .joinWith(pubs, pubs.col("openaireId").equalTo(rels.col("target")), "inner") .map(t -> { - final RelatedPublication rel = new RelatedPublication(t._1.getSource(), t._2); + final RelatedPublication rel = new RelatedPublication( + t._1.getSource(), t._2); rel.getRelPublication().setRelType(t._1.getRelClass()); return rel; }, Encoders.bean(RelatedPublication.class)); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java index d15565d0d7..16b4507338 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareRelatedSoftwaresJob.java @@ -44,10 +44,10 @@ public class PrepareRelatedSoftwaresJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String relsPath = workingPath + "/relatedSoftwares"; + final String relsPath = workingDir + "/relatedSoftwares"; log.info("relsPath: {}", relsPath); final SparkConf conf = new SparkConf(); @@ -64,7 +64,7 @@ public class PrepareRelatedSoftwaresJob { .map(ConversionUtils::oafSoftwareToBrokerSoftware, Encoders.bean(OaBrokerRelatedSoftware.class)); final Dataset rels = ClusterUtils - .readPath(spark, graphPath + "/relation", Relation.class) + .loadRelations(graphPath, spark) .filter(r -> r.getDataInfo().getDeletedbyinference()) .filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT)) .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java index d3c7113ec0..cf44506038 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PrepareSimpleEntititiesJob.java @@ -44,10 +44,10 @@ public class PrepareSimpleEntititiesJob { final String graphPath = parser.get("graphPath"); log.info("graphPath: {}", graphPath); - final String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String simpleEntitiesPath = workingPath + "/simpleEntities"; + final String simpleEntitiesPath = workingDir + "/simpleEntities"; log.info("simpleEntitiesPath: {}", simpleEntitiesPath); final SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java index d8b8dd8079..9ce64f6bde 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ClusterUtils.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.oaf.Relation; public class ClusterUtils { @@ -30,6 +31,16 @@ public class ClusterUtils { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } + public static Dataset loadRelations(final String graphPath, final SparkSession spark) { + return ClusterUtils + .readPath(spark, graphPath + "/relation", Relation.class) + .map(r -> { + r.setSource(ConversionUtils.cleanOpenaireId(r.getSource())); + r.setTarget(ConversionUtils.cleanOpenaireId(r.getTarget())); + return r; + }, Encoders.bean(Relation.class)); + } + public static Dataset readPath( final SparkSession spark, final String inputPath, @@ -67,6 +78,7 @@ public class ClusterUtils { .map(o -> ClusterUtils.incrementAccumulator(o, acc), Encoders.bean(clazz)) .write() .mode(SaveMode.Overwrite) + .option("compression", "gzip") .json(path); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index 053627a5fb..ecbfd821ea 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -74,7 +74,7 @@ public class ConversionUtils { } final OaBrokerRelatedDataset res = new OaBrokerRelatedDataset(); - res.setOpenaireId(d.getId()); + res.setOpenaireId(cleanOpenaireId(d.getId())); res.setOriginalId(first(d.getOriginalId())); res.setTitle(structPropValue(d.getTitle())); res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid)); @@ -89,7 +89,7 @@ public class ConversionUtils { } final OaBrokerRelatedPublication res = new OaBrokerRelatedPublication(); - res.setOpenaireId(p.getId()); + res.setOpenaireId(cleanOpenaireId(p.getId())); res.setOriginalId(first(p.getOriginalId())); res.setTitle(structPropValue(p.getTitle())); res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid)); @@ -106,7 +106,7 @@ public class ConversionUtils { final OaBrokerMainEntity res = new OaBrokerMainEntity(); - res.setOpenaireId(result.getId()); + res.setOpenaireId(cleanOpenaireId(result.getId())); res.setOriginalId(first(result.getOriginalId())); res.setTypology(classId(result.getResulttype())); res.setTitles(structPropList(result.getTitle())); @@ -129,6 +129,10 @@ public class ConversionUtils { return res; } + public static String cleanOpenaireId(final String id) { + return id.contains("|") ? StringUtils.substringAfter(id, "|") : id; + } + private static OaBrokerAuthor oafAuthorToBrokerAuthor(final Author author) { if (author == null) { return null; @@ -188,7 +192,7 @@ public class ConversionUtils { } final OaBrokerProject res = new OaBrokerProject(); - res.setOpenaireId(p.getId()); + res.setOpenaireId(cleanOpenaireId(p.getId())); res.setTitle(fieldValue(p.getTitle())); res.setAcronym(fieldValue(p.getAcronym())); res.setCode(fieldValue(p.getCode())); @@ -214,7 +218,7 @@ public class ConversionUtils { } final OaBrokerRelatedSoftware res = new OaBrokerRelatedSoftware(); - res.setOpenaireId(sw.getId()); + res.setOpenaireId(cleanOpenaireId(sw.getId())); res.setName(structPropValue(sw.getTitle())); res.setDescription(fieldValue(sw.getDescription())); res.setRepository(fieldValue(sw.getCodeRepositoryUrl())); @@ -230,7 +234,7 @@ public class ConversionUtils { final OaBrokerRelatedDatasource res = new OaBrokerRelatedDatasource(); res.setName(StringUtils.defaultIfBlank(fieldValue(ds.getOfficialname()), fieldValue(ds.getEnglishname()))); - res.setOpenaireId(ds.getId()); + res.setOpenaireId(cleanOpenaireId(ds.getId())); res.setType(classId(ds.getDatasourcetype())); return res; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/DatasourceRelationsAccumulator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/DatasourceRelationsAccumulator.java index 75c4625ce4..c693be93c5 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/DatasourceRelationsAccumulator.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/DatasourceRelationsAccumulator.java @@ -59,9 +59,18 @@ public class DatasourceRelationsAccumulator implements Serializable { final DatasourceRelationsAccumulator res = new DatasourceRelationsAccumulator(); collectedFromSet .stream() - .map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.COLLECTED_FROM_REL)) + .map( + s -> new Tuple3<>(ConversionUtils.cleanOpenaireId(r.getId()), ConversionUtils.cleanOpenaireId(s), + BrokerConstants.COLLECTED_FROM_REL)) .forEach(res::addTuple); - hostedBySet.stream().map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.HOSTED_BY_REL)).forEach(res::addTuple); + + hostedBySet + .stream() + .map( + s -> new Tuple3<>(ConversionUtils.cleanOpenaireId(r.getId()), ConversionUtils.cleanOpenaireId(s), + BrokerConstants.HOSTED_BY_REL)) + .forEach(res::addTuple); + return res; } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java index 1ab56cc346..103751f954 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java @@ -76,6 +76,7 @@ public class EventFinder { final Set dsIdWhitelist, final Set dsIdBlacklist, final Set dsTypeWhitelist, + final Set topicWhitelist, final Map accumulators) { final List> list = new ArrayList<>(); @@ -84,7 +85,13 @@ public class EventFinder { for (final OaBrokerRelatedDatasource targetDs : target.getDatasources()) { if (verifyTarget(targetDs, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) { for (final UpdateMatcher matcher : matchers) { - list.addAll(matcher.searchUpdatesForRecord(target, targetDs, results.getData(), accumulators)); + for (final UpdateInfo info : matcher + .searchUpdatesForRecord(target, targetDs, results.getData(), accumulators)) { + if (topicWhitelist == null || topicWhitelist.isEmpty() + || topicWhitelist.contains(info.getTopic().getPath())) { + list.add(info); + } + } } } } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/check_duplicates.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/check_duplicates.json new file mode 100644 index 0000000000..2584b78fcc --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/check_duplicates.json @@ -0,0 +1,9 @@ +[ + + { + "paramName": "o", + "paramLongName": "outputDir", + "paramDescription": "the path where the data are stored", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/common_params.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/common_params.json index adee1888a1..0d942cd59f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/common_params.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/common_params.json @@ -7,7 +7,7 @@ }, { "paramName": "o", - "paramLongName": "workingPath", + "paramLongName": "workingDir", "paramDescription": "the path where the temporary data will be stored", "paramRequired": true } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 14e33b0916..d06eec87e3 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -6,7 +6,7 @@ the path where the graph is stored - workingPath + outputDir the path where the the generated data will be stored @@ -24,6 +24,11 @@ - a black list (comma separeted, - for empty list) of datasource ids + + topicWhitelist + * + a white list (comma separeted, * for all) of topics + esEventIndexName the elasticsearch index name for events @@ -36,6 +41,26 @@ esIndexHost the elasticsearch host + + esBatchWriteRetryCount + 8 + an ES configuration property + + + esBatchWriteRetryWait + 60s + an ES configuration property + + + esBatchSizeEntries + 200 + an ES configuration property + + + esNodesWanOnly + true + an ES configuration property + maxIndexedEventsForDsAndTopic the max number of events for each couple (ds/topic) @@ -111,15 +136,15 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + - + @@ -152,7 +177,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -176,7 +201,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -201,7 +226,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -225,7 +250,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -249,7 +274,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -273,7 +298,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -299,7 +324,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -323,7 +348,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -347,7 +372,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -371,7 +396,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -395,7 +420,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -419,7 +444,7 @@ --conf spark.sql.shuffle.partitions=3840 --graphPath${graphInputPath} - --workingPath${workingPath} + --workingDir${workingDir} @@ -442,10 +467,12 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --workingPath${workingPath} + --workingDir${workingDir} + --outputDir${outputDir} --datasourceIdWhitelist${datasourceIdWhitelist} --datasourceTypeWhitelist${datasourceTypeWhitelist} --datasourceIdBlacklist${datasourceIdBlacklist} + --topicWhitelist${topicWhitelist} @@ -468,9 +495,13 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --workingPath${workingPath} + --outputDir${outputDir} --index${esEventIndexName} --esHost${esIndexHost} + --esBatchWriteRetryCount${esBatchWriteRetryCount} + --esBatchWriteRetryWait${esBatchWriteRetryWait} + --esBatchSizeEntries${esBatchSizeEntries} + --esNodesWanOnly${esNodesWanOnly} --maxEventsForTopic${maxIndexedEventsForDsAndTopic} --brokerApiBaseUrl${brokerApiBaseUrl} @@ -495,9 +526,13 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --workingPath${workingPath} + --outputDir${outputDir} --index${esNotificationsIndexName} --esHost${esIndexHost} + --esBatchWriteRetryCount${esBatchWriteRetryCount} + --esBatchWriteRetryWait${esBatchWriteRetryWait} + --esBatchSizeEntries${esBatchSizeEntries} + --esNodesWanOnly${esNodesWanOnly} --brokerApiBaseUrl${brokerApiBaseUrl} @@ -521,7 +556,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --workingPath${workingPath} + --outputDir${outputDir} --dbUrl${brokerDbUrl} --dbUser${brokerDbUser} --dbPassword${brokerDbPassword} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json index bab8081932..e803bb5b90 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json @@ -1,7 +1,13 @@ [ + { + "paramName": "wp", + "paramLongName": "workingDir", + "paramDescription": "the path where the temporary data are stored", + "paramRequired": true + }, { "paramName": "o", - "paramLongName": "workingPath", + "paramLongName": "outputDir", "paramDescription": "the path where the generated events will be stored", "paramRequired": true }, @@ -22,5 +28,11 @@ "paramLongName": "datasourceIdBlacklist", "paramDescription": "a black list (comma separeted, - for empty list) of datasource ids", "paramRequired": true + }, + { + "paramName": "topicWhitelist", + "paramLongName": "topicWhitelist", + "paramDescription": "a white list (comma separeted, * for all) of topics", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json index ac1dbf7867..f7e072d0fb 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json @@ -1,8 +1,8 @@ [ { "paramName": "o", - "paramLongName": "workingPath", - "paramDescription": "the workinh path", + "paramLongName": "outputDir", + "paramDescription": "the data path", "paramRequired": true }, { @@ -16,5 +16,29 @@ "paramLongName": "esHost", "paramDescription": "the ES host", "paramRequired": true + }, + { + "paramName": "esBatchWriteRetryCount", + "paramLongName": "esBatchWriteRetryCount", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esBatchWriteRetryWait", + "paramLongName": "esBatchWriteRetryWait", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esBatchSizeEntries", + "paramLongName": "esBatchSizeEntries", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esNodesWanOnly", + "paramLongName": "esNodesWanOnly", + "paramDescription": "an ES configuration property", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json index 4921bc03e2..0046490bbb 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json @@ -1,8 +1,8 @@ [ { "paramName": "o", - "paramLongName": "workingPath", - "paramDescription": "the workinh path", + "paramLongName": "outputDir", + "paramDescription": "the path where the generated data are stored", "paramRequired": true }, { @@ -16,7 +16,31 @@ "paramLongName": "esHost", "paramDescription": "the ES host", "paramRequired": true + }, + { + "paramName": "esBatchWriteRetryCount", + "paramLongName": "esBatchWriteRetryCount", + "paramDescription": "an ES configuration property", + "paramRequired": true }, + { + "paramName": "esBatchWriteRetryWait", + "paramLongName": "esBatchWriteRetryWait", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esBatchSizeEntries", + "paramLongName": "esBatchSizeEntries", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esNodesWanOnly", + "paramLongName": "esNodesWanOnly", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, { "paramName": "n", "paramLongName": "maxEventsForTopic", diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json index 5eea894c8e..370b484115 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json @@ -1,8 +1,8 @@ [ { "paramName": "o", - "paramLongName": "workingPath", - "paramDescription": "the workinh path", + "paramLongName": "outputDir", + "paramDescription": "the dir that contains the events folder", "paramRequired": true }, { @@ -17,6 +17,30 @@ "paramDescription": "the ES host", "paramRequired": true }, + { + "paramName": "esBatchWriteRetryCount", + "paramLongName": "esBatchWriteRetryCount", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esBatchWriteRetryWait", + "paramLongName": "esBatchWriteRetryWait", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esBatchSizeEntries", + "paramLongName": "esBatchSizeEntries", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esNodesWanOnly", + "paramLongName": "esNodesWanOnly", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, { "paramName": "broker", "paramLongName": "brokerApiBaseUrl", diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml index 879c0d3490..248326d57e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml @@ -6,8 +6,8 @@ the path where the graph is stored - workingPath - the path where the the generated data will be stored + outputDir + the path where the the generated data are stored datasourceIdWhitelist @@ -36,6 +36,26 @@ esIndexHost the elasticsearch host + + esBatchWriteRetryCount + 8 + an ES configuration property + + + esBatchWriteRetryWait + 60s + an ES configuration property + + + esBatchSizeEntries + 200 + an ES configuration property + + + esNodesWanOnly + true + an ES configuration property + maxIndexedEventsForDsAndTopic the max number of events for each couple (ds/topic) @@ -122,9 +142,13 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --workingPath${workingPath} + --outputDir${outputDir} --index${esNotificationsIndexName} --esHost${esIndexHost} + --esBatchWriteRetryCount${esBatchWriteRetryCount} + --esBatchWriteRetryWait${esBatchWriteRetryWait} + --esBatchSizeEntries${esBatchSizeEntries} + --esNodesWanOnly${esNodesWanOnly} --brokerApiBaseUrl${brokerApiBaseUrl} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/od_partitions_params.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/od_partitions_params.json index 10ba926abf..12cd6a391c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/od_partitions_params.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/od_partitions_params.json @@ -1,8 +1,8 @@ [ { "paramName": "o", - "paramLongName": "workingPath", - "paramDescription": "the path where the temporary data will be stored", + "paramLongName": "outputDir", + "paramDescription": "the path where the data will be stored", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/opendoarPartition/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/opendoarPartition/oozie_app/workflow.xml index dba3c9f73b..83fe47c758 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/opendoarPartition/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/opendoarPartition/oozie_app/workflow.xml @@ -6,7 +6,7 @@ the opendoar IDs whitelist (comma separated) - workingPath + outputDir the path where the the generated data will be stored @@ -87,7 +87,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --workingPath${workingPath} + --workingDir${workingDir} --opendoarIds${opendoarIds} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/config-default.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/config-default.xml new file mode 100644 index 0000000000..2e0ed9aeea --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml new file mode 100644 index 0000000000..9095004ad5 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml @@ -0,0 +1,140 @@ + + + + + outputDir + the path where the the generated data will be stored + + + esEventIndexName + the elasticsearch index name for events + + + esIndexHost + the elasticsearch host + + + esBatchWriteRetryCount + 8 + an ES configuration property + + + esBatchWriteRetryWait + 60s + an ES configuration property + + + esBatchSizeEntries + 200 + an ES configuration property + + + esNodesWanOnly + true + an ES configuration property + + + maxIndexedEventsForDsAndTopic + the max number of events for each couple (ds/topic) + + + brokerApiBaseUrl + the url of the broker service api + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + IndexEventSubsetOnESJob + eu.dnetlib.dhp.broker.oa.IndexEventSubsetJob + dhp-broker-events-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --outputDir${outputDir} + --index${esEventIndexName} + --esHost${esIndexHost} + --esBatchWriteRetryCount${esBatchWriteRetryCount} + --esBatchWriteRetryWait${esBatchWriteRetryWait} + --esBatchSizeEntries${esBatchSizeEntries} + --esNodesWanOnly${esNodesWanOnly} + --maxEventsForTopic${maxIndexedEventsForDsAndTopic} + --brokerApiBaseUrl${brokerApiBaseUrl} + + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json index 15d7d251f6..2388b1c1f4 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/stats_params.json @@ -1,8 +1,8 @@ [ { - "paramName": "wp", - "paramLongName": "workingPath", - "paramDescription": "the working path", + "paramName": "o", + "paramLongName": "outputDir", + "paramDescription": "the path where generated data are stored", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index b6210013cb..3adbd244c6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -23,7 +23,15 @@ import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT; import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_RESULT; import static eu.dnetlib.dhp.schema.common.ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE; import static eu.dnetlib.dhp.schema.common.ModelConstants.USER_CLAIM; -import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.*; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.asString; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.createOpenaireId; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.dataInfo; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.field; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.journal; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.listFields; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.listKeyValues; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.qualifier; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.structuredProperty; import java.io.Closeable; import java.io.IOException; @@ -462,44 +470,48 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i return Arrays.asList(r); } else { + final String validationDate = rs.getString("curation_date"); + final String sourceId = createOpenaireId(rs.getString(SOURCE_TYPE), rs.getString("source_id"), false); final String targetId = createOpenaireId(rs.getString(TARGET_TYPE), rs.getString("target_id"), false); final Relation r1 = new Relation(); final Relation r2 = new Relation(); - if (rs.getString(SOURCE_TYPE).equals("project")) { - r1.setCollectedfrom(collectedFrom); - r1.setRelType(RESULT_PROJECT); - r1.setSubRelType(OUTCOME); - r1.setRelClass(PRODUCES); - - r2.setCollectedfrom(collectedFrom); - r2.setRelType(RESULT_PROJECT); - r2.setSubRelType(OUTCOME); - r2.setRelClass(IS_PRODUCED_BY); - } else { - r1.setCollectedfrom(collectedFrom); - r1.setRelType(RESULT_RESULT); - r1.setSubRelType(RELATIONSHIP); - r1.setRelClass(IS_RELATED_TO); - - r2.setCollectedfrom(collectedFrom); - r2.setRelType(RESULT_RESULT); - r2.setSubRelType(RELATIONSHIP); - r2.setRelClass(IS_RELATED_TO); - } - + r1.setValidated(true); + r1.setValidationDate(validationDate); + r1.setCollectedfrom(collectedFrom); r1.setSource(sourceId); r1.setTarget(targetId); r1.setDataInfo(info); r1.setLastupdatetimestamp(lastUpdateTimestamp); + r2.setValidationDate(validationDate); + r2.setValidated(true); + r2.setCollectedfrom(collectedFrom); r2.setSource(targetId); r2.setTarget(sourceId); r2.setDataInfo(info); r2.setLastupdatetimestamp(lastUpdateTimestamp); + if (rs.getString(SOURCE_TYPE).equals("project")) { + r1.setRelType(RESULT_PROJECT); + r1.setSubRelType(OUTCOME); + r1.setRelClass(PRODUCES); + + r2.setRelType(RESULT_PROJECT); + r2.setSubRelType(OUTCOME); + r2.setRelClass(IS_PRODUCED_BY); + } else { + r1.setRelType(RESULT_RESULT); + r1.setSubRelType(RELATIONSHIP); + r1.setRelClass(IS_RELATED_TO); + + r2.setRelType(RESULT_RESULT); + r2.setSubRelType(RELATIONSHIP); + r2.setRelClass(IS_RELATED_TO); + } + return Arrays.asList(r1, r2); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryClaims.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryClaims.sql index 0390c11aa3..f912d3ce92 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryClaims.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryClaims.sql @@ -1 +1 @@ -SELECT source_type, source_id, target_type, target_id, semantics FROM claim WHERE approved=TRUE; \ No newline at end of file +SELECT source_type, source_id, target_type, target_id, semantics, curation_date::text FROM claim WHERE approved=TRUE; \ No newline at end of file