deleting events

This commit is contained in:
Michele Artini 2020-08-21 14:52:48 +02:00
parent 6e60bf026a
commit da470422d3
5 changed files with 46 additions and 0 deletions

View File

@ -31,6 +31,10 @@
<artifactId>elasticsearch-hadoop</artifactId> <artifactId>elasticsearch-hadoop</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency> <dependency>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>

View File

@ -7,6 +7,10 @@ import java.util.Map;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.math.NumberUtils; import org.apache.commons.lang3.math.NumberUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -54,6 +58,9 @@ public class IndexEventSubsetJob {
final int maxEventsForTopic = NumberUtils.toInt(parser.get("maxEventsForTopic")); final int maxEventsForTopic = NumberUtils.toInt(parser.get("maxEventsForTopic"));
log.info("maxEventsForTopic: {}", maxEventsForTopic); log.info("maxEventsForTopic: {}", maxEventsForTopic);
final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl");
log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl);
final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
final TypedColumn<Event, EventGroup> aggr = new EventSubsetAggregator(maxEventsForTopic).toColumn(); final TypedColumn<Event, EventGroup> aggr = new EventSubsetAggregator(maxEventsForTopic).toColumn();
@ -84,7 +91,26 @@ public class IndexEventSubsetJob {
esCfg.put("es.batch.size.entries", "200"); esCfg.put("es.batch.size.entries", "200");
esCfg.put("es.nodes.wan.only", "true"); esCfg.put("es.nodes.wan.only", "true");
log.info("*** Start indexing");
JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg);
log.info("*** End indexing");
log.info("*** Deleting old events");
final String message = deleteOldEvents(brokerApiBaseUrl, now - 1000);
log.info("*** Deleted events: " + message);
}
private static String deleteOldEvents(final String brokerApiBaseUrl, final long l) throws Exception {
final String url = brokerApiBaseUrl + "/api/events/byCreationDate/0/" + l;
final HttpDelete req = new HttpDelete(url);
try (final CloseableHttpClient client = HttpClients.createDefault()) {
try (final CloseableHttpResponse response = client.execute(req)) {
return IOUtils.toString(response.getEntity().getContent());
}
}
} }
private static String prepareEventForIndexing(final Event e, final long creationDate, final LongAccumulator acc) private static String prepareEventForIndexing(final Event e, final long creationDate, final LongAccumulator acc)

View File

@ -36,6 +36,10 @@
<name>maxIndexedEventsForDsAndTopic</name> <name>maxIndexedEventsForDsAndTopic</name>
<description>the max number of events for each couple (ds/topic)</description> <description>the max number of events for each couple (ds/topic)</description>
</property> </property>
<property>
<name>brokerApiBaseUrl</name>
<description>the url of the broker service api</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -452,6 +456,7 @@
<arg>--index</arg><arg>${esEventIndexName}</arg> <arg>--index</arg><arg>${esEventIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg> <arg>--esHost</arg><arg>${esIndexHost}</arg>
<arg>--maxEventsForTopic</arg><arg>${maxIndexedEventsForDsAndTopic}</arg> <arg>--maxEventsForTopic</arg><arg>${maxIndexedEventsForDsAndTopic}</arg>
<arg>--brokerApiBaseUrl</arg><arg>${brokerApiBaseUrl}</arg>
</spark> </spark>
<ok to="stats"/> <ok to="stats"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -22,5 +22,11 @@
"paramLongName": "maxEventsForTopic", "paramLongName": "maxEventsForTopic",
"paramDescription": "the max number of events for each couple (ds/topic)", "paramDescription": "the max number of events for each couple (ds/topic)",
"paramRequired": true "paramRequired": true
},
{
"paramName": "broker",
"paramLongName": "brokerApiBaseUrl",
"paramDescription": "the url of the broker service api",
"paramRequired": true
} }
] ]

View File

@ -36,6 +36,10 @@
<name>maxIndexedEventsForDsAndTopic</name> <name>maxIndexedEventsForDsAndTopic</name>
<description>the max number of events for each couple (ds/topic)</description> <description>the max number of events for each couple (ds/topic)</description>
</property> </property>
<property>
<name>brokerApiBaseUrl</name>
<description>the url of the broker service api</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -118,6 +122,7 @@
<arg>--index</arg><arg>${esEventIndexName}</arg> <arg>--index</arg><arg>${esEventIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg> <arg>--esHost</arg><arg>${esIndexHost}</arg>
<arg>--maxEventsForTopic</arg><arg>${maxIndexedEventsForDsAndTopic}</arg> <arg>--maxEventsForTopic</arg><arg>${maxIndexedEventsForDsAndTopic}</arg>
<arg>--brokerApiBaseUrl</arg><arg>${brokerApiBaseUrl}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>