diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java index 72efc9e6b3..e18a7ef560 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java @@ -55,6 +55,18 @@ public class IndexEventSubsetJob { final String indexHost = parser.get("esHost"); log.info("indexHost: {}", indexHost); + final String esBatchWriteRetryCount = parser.get("esBatchWriteRetryCount"); + log.info("esBatchWriteRetryCount: {}", esBatchWriteRetryCount); + + final String esBatchWriteRetryWait = parser.get("esBatchWriteRetryWait"); + log.info("esBatchWriteRetryWait: {}", esBatchWriteRetryWait); + + final String esBatchSizeEntries = parser.get("esBatchSizeEntries"); + log.info("esBatchSizeEntries: {}", esBatchSizeEntries); + + final String esNodesWanOnly = parser.get("esNodesWanOnly"); + log.info("esNodesWanOnly: {}", esNodesWanOnly); + final int maxEventsForTopic = NumberUtils.toInt(parser.get("maxEventsForTopic")); log.info("maxEventsForTopic: {}", maxEventsForTopic); @@ -86,10 +98,10 @@ public class IndexEventSubsetJob { esCfg.put("es.index.auto.create", "false"); esCfg.put("es.nodes", indexHost); esCfg.put("es.mapping.id", "eventId"); // THE PRIMARY KEY - esCfg.put("es.batch.write.retry.count", "8"); - esCfg.put("es.batch.write.retry.wait", "60s"); - esCfg.put("es.batch.size.entries", "200"); - esCfg.put("es.nodes.wan.only", "true"); + esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount); + esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait); + esCfg.put("es.batch.size.entries", esBatchSizeEntries); + esCfg.put("es.nodes.wan.only", esNodesWanOnly); log.info("*** Start indexing"); JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index 29fc72d04b..75f4eb066d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -63,6 +63,18 @@ public class IndexNotificationsJob { final String indexHost = parser.get("esHost"); log.info("indexHost: {}", indexHost); + final String esBatchWriteRetryCount = parser.get("esBatchWriteRetryCount"); + log.info("esBatchWriteRetryCount: {}", esBatchWriteRetryCount); + + final String esBatchWriteRetryWait = parser.get("esBatchWriteRetryWait"); + log.info("esBatchWriteRetryWait: {}", esBatchWriteRetryWait); + + final String esBatchSizeEntries = parser.get("esBatchSizeEntries"); + log.info("esBatchSizeEntries: {}", esBatchSizeEntries); + + final String esNodesWanOnly = parser.get("esNodesWanOnly"); + log.info("esNodesWanOnly: {}", esNodesWanOnly); + final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl"); log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl); @@ -92,10 +104,10 @@ public class IndexNotificationsJob { esCfg.put("es.index.auto.create", "false"); esCfg.put("es.nodes", indexHost); esCfg.put("es.mapping.id", "notificationId"); // THE PRIMARY KEY - esCfg.put("es.batch.write.retry.count", "8"); - esCfg.put("es.batch.write.retry.wait", "60s"); - esCfg.put("es.batch.size.entries", "200"); - esCfg.put("es.nodes.wan.only", "true"); + esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount); + esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait); + esCfg.put("es.batch.size.entries", esBatchSizeEntries); + esCfg.put("es.nodes.wan.only", esNodesWanOnly); log.info("*** Start indexing"); JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java index 006cde48cb..380a689e4d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexOnESJob.java @@ -45,6 +45,18 @@ public class IndexOnESJob { final String indexHost = parser.get("esHost"); log.info("indexHost: {}", indexHost); + final String esBatchWriteRetryCount = parser.get("esBatchWriteRetryCount"); + log.info("esBatchWriteRetryCount: {}", esBatchWriteRetryCount); + + final String esBatchWriteRetryWait = parser.get("esBatchWriteRetryWait"); + log.info("esBatchWriteRetryWait: {}", esBatchWriteRetryWait); + + final String esBatchSizeEntries = parser.get("esBatchSizeEntries"); + log.info("esBatchSizeEntries: {}", esBatchSizeEntries); + + final String esNodesWanOnly = parser.get("esNodesWanOnly"); + log.info("esNodesWanOnly: {}", esNodesWanOnly); + final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); final JavaRDD inputRdd = ClusterUtils @@ -53,15 +65,13 @@ public class IndexOnESJob { .javaRDD(); final Map esCfg = new HashMap<>(); - // esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54"); - esCfg.put("es.index.auto.create", "false"); esCfg.put("es.nodes", indexHost); esCfg.put("es.mapping.id", "eventId"); // THE PRIMARY KEY - esCfg.put("es.batch.write.retry.count", "8"); - esCfg.put("es.batch.write.retry.wait", "60s"); - esCfg.put("es.batch.size.entries", "200"); - esCfg.put("es.nodes.wan.only", "true"); + esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount); + esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait); + esCfg.put("es.batch.size.entries", esBatchSizeEntries); + esCfg.put("es.nodes.wan.only", esNodesWanOnly); JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 250f928f8e..d06eec87e3 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -41,6 +41,26 @@ esIndexHost the elasticsearch host + + esBatchWriteRetryCount + 8 + an ES configuration property + + + esBatchWriteRetryWait + 60s + an ES configuration property + + + esBatchSizeEntries + 200 + an ES configuration property + + + esNodesWanOnly + true + an ES configuration property + maxIndexedEventsForDsAndTopic the max number of events for each couple (ds/topic) @@ -478,6 +498,10 @@ --outputDir${outputDir} --index${esEventIndexName} --esHost${esIndexHost} + --esBatchWriteRetryCount${esBatchWriteRetryCount} + --esBatchWriteRetryWait${esBatchWriteRetryWait} + --esBatchSizeEntries${esBatchSizeEntries} + --esNodesWanOnly${esNodesWanOnly} --maxEventsForTopic${maxIndexedEventsForDsAndTopic} --brokerApiBaseUrl${brokerApiBaseUrl} @@ -505,6 +529,10 @@ --outputDir${outputDir} --index${esNotificationsIndexName} --esHost${esIndexHost} + --esBatchWriteRetryCount${esBatchWriteRetryCount} + --esBatchWriteRetryWait${esBatchWriteRetryWait} + --esBatchSizeEntries${esBatchSizeEntries} + --esNodesWanOnly${esNodesWanOnly} --brokerApiBaseUrl${brokerApiBaseUrl} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json index 079709ad89..f7e072d0fb 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_es.json @@ -16,5 +16,29 @@ "paramLongName": "esHost", "paramDescription": "the ES host", "paramRequired": true + }, + { + "paramName": "esBatchWriteRetryCount", + "paramLongName": "esBatchWriteRetryCount", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esBatchWriteRetryWait", + "paramLongName": "esBatchWriteRetryWait", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esBatchSizeEntries", + "paramLongName": "esBatchSizeEntries", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esNodesWanOnly", + "paramLongName": "esNodesWanOnly", + "paramDescription": "an ES configuration property", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json index 441249661e..0046490bbb 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json @@ -16,7 +16,31 @@ "paramLongName": "esHost", "paramDescription": "the ES host", "paramRequired": true + }, + { + "paramName": "esBatchWriteRetryCount", + "paramLongName": "esBatchWriteRetryCount", + "paramDescription": "an ES configuration property", + "paramRequired": true }, + { + "paramName": "esBatchWriteRetryWait", + "paramLongName": "esBatchWriteRetryWait", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esBatchSizeEntries", + "paramLongName": "esBatchSizeEntries", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esNodesWanOnly", + "paramLongName": "esNodesWanOnly", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, { "paramName": "n", "paramLongName": "maxEventsForTopic", diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json index 63e9b12637..370b484115 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_notifications.json @@ -17,6 +17,30 @@ "paramDescription": "the ES host", "paramRequired": true }, + { + "paramName": "esBatchWriteRetryCount", + "paramLongName": "esBatchWriteRetryCount", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esBatchWriteRetryWait", + "paramLongName": "esBatchWriteRetryWait", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esBatchSizeEntries", + "paramLongName": "esBatchSizeEntries", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, + { + "paramName": "esNodesWanOnly", + "paramLongName": "esNodesWanOnly", + "paramDescription": "an ES configuration property", + "paramRequired": true + }, { "paramName": "broker", "paramLongName": "brokerApiBaseUrl", diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml index c73b3fb524..248326d57e 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml @@ -36,6 +36,26 @@ esIndexHost the elasticsearch host + + esBatchWriteRetryCount + 8 + an ES configuration property + + + esBatchWriteRetryWait + 60s + an ES configuration property + + + esBatchSizeEntries + 200 + an ES configuration property + + + esNodesWanOnly + true + an ES configuration property + maxIndexedEventsForDsAndTopic the max number of events for each couple (ds/topic) @@ -125,6 +145,10 @@ --outputDir${outputDir} --index${esNotificationsIndexName} --esHost${esIndexHost} + --esBatchWriteRetryCount${esBatchWriteRetryCount} + --esBatchWriteRetryWait${esBatchWriteRetryWait} + --esBatchSizeEntries${esBatchSizeEntries} + --esNodesWanOnly${esNodesWanOnly} --brokerApiBaseUrl${brokerApiBaseUrl} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml index 43c26f3a3e..9095004ad5 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml @@ -13,6 +13,26 @@ esIndexHost the elasticsearch host + + esBatchWriteRetryCount + 8 + an ES configuration property + + + esBatchWriteRetryWait + 60s + an ES configuration property + + + esBatchSizeEntries + 200 + an ES configuration property + + + esNodesWanOnly + true + an ES configuration property + maxIndexedEventsForDsAndTopic the max number of events for each couple (ds/topic) @@ -102,6 +122,10 @@ --outputDir${outputDir} --index${esEventIndexName} --esHost${esIndexHost} + --esBatchWriteRetryCount${esBatchWriteRetryCount} + --esBatchWriteRetryWait${esBatchWriteRetryWait} + --esBatchSizeEntries${esBatchSizeEntries} + --esNodesWanOnly${esNodesWanOnly} --maxEventsForTopic${maxIndexedEventsForDsAndTopic} --brokerApiBaseUrl${brokerApiBaseUrl}