diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml index f98708c648..75cc0ea095 100644 --- a/dhp-workflows/dhp-broker-events/pom.xml +++ b/dhp-workflows/dhp-broker-events/pom.xml @@ -31,6 +31,10 @@ elasticsearch-hadoop + + org.apache.httpcomponents + httpclient + eu.dnetlib.dhp diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java index 17451d067d..d3cbe0034f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.java @@ -7,6 +7,10 @@ import java.util.Map; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.math.NumberUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; @@ -54,6 +58,9 @@ public class IndexEventSubsetJob { final int maxEventsForTopic = NumberUtils.toInt(parser.get("maxEventsForTopic")); log.info("maxEventsForTopic: {}", maxEventsForTopic); + final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl"); + log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl); + final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); final TypedColumn aggr = new EventSubsetAggregator(maxEventsForTopic).toColumn(); @@ -84,7 +91,26 @@ public class IndexEventSubsetJob { esCfg.put("es.batch.size.entries", "200"); esCfg.put("es.nodes.wan.only", "true"); + log.info("*** Start indexing"); JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); + log.info("*** End indexing"); + + log.info("*** Deleting old events"); + final String message = deleteOldEvents(brokerApiBaseUrl, now - 1000); + log.info("*** Deleted events: " + message); + + } + + private static String deleteOldEvents(final String brokerApiBaseUrl, final long l) throws Exception { + final String url = brokerApiBaseUrl + "/api/events/byCreationDate/0/" + l; + final HttpDelete req = new HttpDelete(url); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + return IOUtils.toString(response.getEntity().getContent()); + } + } + } private static String prepareEventForIndexing(final Event e, final long creationDate, final LongAccumulator acc) diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index a10794f637..2e669676bf 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -36,6 +36,10 @@ maxIndexedEventsForDsAndTopic the max number of events for each couple (ds/topic) + + brokerApiBaseUrl + the url of the broker service api + sparkDriverMemory memory for driver process @@ -452,6 +456,7 @@ --index${esEventIndexName} --esHost${esIndexHost} --maxEventsForTopic${maxIndexedEventsForDsAndTopic} + --brokerApiBaseUrl${brokerApiBaseUrl} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json index 72703ae335..4921bc03e2 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/index_event_subset.json @@ -22,5 +22,11 @@ "paramLongName": "maxEventsForTopic", "paramDescription": "the max number of events for each couple (ds/topic)", "paramRequired": true + }, + { + "paramName": "broker", + "paramLongName": "brokerApiBaseUrl", + "paramDescription": "the url of the broker service api", + "paramRequired": true } ] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index 306343dbe4..0b0557693c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -36,6 +36,10 @@ maxIndexedEventsForDsAndTopic the max number of events for each couple (ds/topic) + + brokerApiBaseUrl + the url of the broker service api + sparkDriverMemory memory for driver process @@ -118,6 +122,7 @@ --index${esEventIndexName} --esHost${esIndexHost} --maxEventsForTopic${maxIndexedEventsForDsAndTopic} + --brokerApiBaseUrl${brokerApiBaseUrl}