diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index 80549e1ce..7b65c27ab 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -101,33 +101,10 @@ public class IndexNotificationsJob { ngEncoder) .flatMap((FlatMapFunction) g -> g.getData().iterator(), nEncoder); - final JavaRDD inputRdd = notifications + notifications .map((MapFunction) n -> prepareForIndexing(n, total), Encoders.STRING()) - .javaRDD(); - - final Map esCfg = new HashMap<>(); - // esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54"); - - esCfg.put("es.index.auto.create", "false"); - esCfg.put("es.nodes", indexHost); - esCfg.put("es.mapping.id", "notificationId"); // THE PRIMARY KEY - esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount); - esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait); - esCfg.put("es.batch.size.entries", esBatchSizeEntries); - esCfg.put("es.nodes.wan.only", esNodesWanOnly); - - log.info("*** Start indexing"); - JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); - log.info("*** End indexing"); - - log.info("*** Deleting old notifications"); - final String message = deleteOldNotifications(brokerApiBaseUrl, startTime - 1000); - log.info("*** Deleted notifications: " + message); - - log.info("*** sendNotifications (emails, ...)"); - sendNotifications(brokerApiBaseUrl, startTime - 1000); - log.info("*** ALL done."); - + .javaRDD() + .saveAsTextFile("/tmp/IndexNotificationsJob_test_6504"); } }