undo last commit

This commit is contained in:
Claudio Atzori 2021-10-19 15:39:38 +02:00
parent e471f12d5e
commit bdffa86c2f
2 changed files with 34 additions and 42 deletions

View File

@ -17,7 +17,6 @@ import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -25,7 +24,6 @@ import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.LongAccumulator;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -33,7 +31,11 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.*; import eu.dnetlib.dhp.broker.model.ConditionParams;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.model.MappedFields;
import eu.dnetlib.dhp.broker.model.Notification;
import eu.dnetlib.dhp.broker.model.Subscription;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.NotificationGroup; import eu.dnetlib.dhp.broker.oa.util.NotificationGroup;
import eu.dnetlib.dhp.broker.oa.util.SubscriptionUtils; import eu.dnetlib.dhp.broker.oa.util.SubscriptionUtils;
@ -85,55 +87,46 @@ public class IndexNotificationsJob {
final List<Subscription> subscriptions = listSubscriptions(brokerApiBaseUrl); final List<Subscription> subscriptions = listSubscriptions(brokerApiBaseUrl);
log.info("Number of subscriptions: {}", subscriptions.size()); log.info("Number of subscriptions: " + subscriptions.size());
if (subscriptions.size() > 0) {
final Map<String, Map<String, List<ConditionParams>>> conditionsMap = prepareConditionsMap(subscriptions);
log.info("ConditionsMap: " + new ObjectMapper().writeValueAsString(conditionsMap));
if (!subscriptions.isEmpty()) {
final Encoder<NotificationGroup> ngEncoder = Encoders.bean(NotificationGroup.class); final Encoder<NotificationGroup> ngEncoder = Encoders.bean(NotificationGroup.class);
final Encoder<Notification> nEncoder = Encoders.bean(Notification.class); final Encoder<Notification> nEncoder = Encoders.bean(Notification.class);
final Dataset<Notification> notifications = ClusterUtils final Dataset<Notification> notifications = ClusterUtils
.readPath(spark, eventsPath, Event.class) .readPath(spark, eventsPath, Event.class)
.map( .map(
(MapFunction<Event, NotificationGroup>) e -> generateNotifications(e, subscriptions, startTime), (MapFunction<Event, NotificationGroup>) e -> generateNotifications(
e, subscriptions, conditionsMap, startTime),
ngEncoder) ngEncoder)
.flatMap((FlatMapFunction<NotificationGroup, Notification>) g -> g.getData().iterator(), nEncoder); .flatMap((FlatMapFunction<NotificationGroup, Notification>) g -> g.getData().iterator(), nEncoder);
final JavaRDD<String> inputRdd = notifications notifications
.map((MapFunction<Notification, String>) n -> prepareForIndexing(n, total), Encoders.STRING()) .map((MapFunction<Notification, String>) n -> prepareForIndexing(n, total), Encoders.STRING())
.javaRDD(); .javaRDD()
.saveAsTextFile("/tmp/IndexNotificationsJob_test_6504");
final Map<String, String> esCfg = new HashMap<>();
esCfg.put("es.index.auto.create", "false");
esCfg.put("es.nodes", indexHost);
esCfg.put("es.mapping.id", "notificationId"); // THE PRIMARY KEY
esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount);
esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait);
esCfg.put("es.batch.size.entries", esBatchSizeEntries);
esCfg.put("es.nodes.wan.only", esNodesWanOnly);
log.info("*** Start indexing");
JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg);
log.info("*** End indexing");
log.info("*** Deleting old notifications");
final String message = deleteOldNotifications(brokerApiBaseUrl, startTime - 1000);
log.info("*** Deleted notifications: {}", message);
log.info("*** sendNotifications (emails, ...)");
sendNotifications(brokerApiBaseUrl, startTime - 1000);
log.info("*** ALL done.");
} }
} }
private static NotificationGroup generateNotifications(final Event e, protected static Map<String, Map<String, List<ConditionParams>>> prepareConditionsMap(
final List<Subscription> subscriptions) {
final Map<String, Map<String, List<ConditionParams>>> map = new HashMap<>();
subscriptions.forEach(s -> map.put(s.getSubscriptionId(), s.conditionsAsMap()));
return map;
}
protected static NotificationGroup generateNotifications(final Event e,
final List<Subscription> subscriptions, final List<Subscription> subscriptions,
final Map<String, Map<String, List<ConditionParams>>> conditionsMap,
final long date) { final long date) {
final List<Notification> list = subscriptions final List<Notification> list = subscriptions
.stream() .stream()
.filter( .filter(
s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic()))
.filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) .filter(s -> verifyConditions(e.getMap(), conditionsMap.get(s.getSubscriptionId())))
.map(s -> generateNotification(s, e, date)) .map(s -> generateNotification(s, e, date))
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -169,28 +162,28 @@ public class IndexNotificationsJob {
return false; return false;
} }
if (conditions.containsKey("targetDateofacceptance") && conditions if (conditions.containsKey("targetDateofacceptance") && !conditions
.get("targetDateofacceptance") .get("targetDateofacceptance")
.stream() .stream()
.noneMatch( .anyMatch(
c -> SubscriptionUtils c -> SubscriptionUtils
.verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) {
return false; return false;
} }
if (conditions.containsKey("targetResultTitle") if (conditions.containsKey("targetResultTitle")
&& conditions && !conditions
.get("targetResultTitle") .get("targetResultTitle")
.stream() .stream()
.noneMatch(c -> SubscriptionUtils.verifySimilar(map.getTargetResultTitle(), c.getValue()))) { .anyMatch(c -> SubscriptionUtils.verifySimilar(map.getTargetResultTitle(), c.getValue()))) {
return false; return false;
} }
if (conditions.containsKey("targetAuthors") if (conditions.containsKey("targetAuthors")
&& conditions && !conditions
.get("targetAuthors") .get("targetAuthors")
.stream() .stream()
.noneMatch(c -> SubscriptionUtils.verifyListSimilar(map.getTargetAuthors(), c.getValue()))) { .allMatch(c -> SubscriptionUtils.verifyListSimilar(map.getTargetAuthors(), c.getValue()))) {
return false; return false;
} }
@ -202,7 +195,7 @@ public class IndexNotificationsJob {
} }
private static List<Subscription> listSubscriptions(final String brokerApiBaseUrl) throws IOException { private static List<Subscription> listSubscriptions(final String brokerApiBaseUrl) throws Exception {
final String url = brokerApiBaseUrl + "/api/subscriptions"; final String url = brokerApiBaseUrl + "/api/subscriptions";
final HttpGet req = new HttpGet(url); final HttpGet req = new HttpGet(url);
@ -217,7 +210,7 @@ public class IndexNotificationsJob {
} }
} }
private static String deleteOldNotifications(final String brokerApiBaseUrl, final long l) throws IOException { private static String deleteOldNotifications(final String brokerApiBaseUrl, final long l) throws Exception {
final String url = brokerApiBaseUrl + "/api/notifications/byDate/0/" + l; final String url = brokerApiBaseUrl + "/api/notifications/byDate/0/" + l;
final HttpDelete req = new HttpDelete(url); final HttpDelete req = new HttpDelete(url);

View File

@ -135,7 +135,6 @@
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.dynamicAllocation.maxExecutors="8"
--conf spark.extraListeners=${spark2ExtraListeners} --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}