IndexNotificationsJob test: persist contents on HDFS instead of passing them to ES

This commit is contained in:
Claudio Atzori 2021-10-01 09:41:27 +02:00
parent 370dddb2fa
commit ec94cc9b93
1 changed files with 3 additions and 26 deletions

View File

@ -101,33 +101,10 @@ public class IndexNotificationsJob {
ngEncoder) ngEncoder)
.flatMap((FlatMapFunction<NotificationGroup, Notification>) g -> g.getData().iterator(), nEncoder); .flatMap((FlatMapFunction<NotificationGroup, Notification>) g -> g.getData().iterator(), nEncoder);
final JavaRDD<String> inputRdd = notifications notifications
.map((MapFunction<Notification, String>) n -> prepareForIndexing(n, total), Encoders.STRING()) .map((MapFunction<Notification, String>) n -> prepareForIndexing(n, total), Encoders.STRING())
.javaRDD(); .javaRDD()
.saveAsTextFile("/tmp/IndexNotificationsJob_test_6504");
final Map<String, String> esCfg = new HashMap<>();
// esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54");
esCfg.put("es.index.auto.create", "false");
esCfg.put("es.nodes", indexHost);
esCfg.put("es.mapping.id", "notificationId"); // THE PRIMARY KEY
esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount);
esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait);
esCfg.put("es.batch.size.entries", esBatchSizeEntries);
esCfg.put("es.nodes.wan.only", esNodesWanOnly);
log.info("*** Start indexing");
JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg);
log.info("*** End indexing");
log.info("*** Deleting old notifications");
final String message = deleteOldNotifications(brokerApiBaseUrl, startTime - 1000);
log.info("*** Deleted notifications: " + message);
log.info("*** sendNotifications (emails, ...)");
sendNotifications(brokerApiBaseUrl, startTime - 1000);
log.info("*** ALL done.");
} }
} }