sump notifications as file

This commit is contained in:
Michele Artini 2020-09-07 16:01:29 +02:00
parent e0a60c563c
commit 58da712318
2 changed files with 83 additions and 4 deletions

View File

@ -1,11 +1,14 @@
package eu.dnetlib.broker.oa.controllers;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.Map;
import com.google.gson.Gson;
import eu.dnetlib.broker.common.elasticsearch.Notification;
import eu.dnetlib.broker.objects.OaBrokerEventPayload;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
public class NotificationMessage implements Serializable {
@ -14,8 +17,14 @@ public class NotificationMessage implements Serializable {
*/
private static final long serialVersionUID = 7302363775341307950L;
private String originalId;
private String topic;
private float trust;
private Map<String, String> message = new LinkedHashMap<>();
public static NotificationMessage fromNotification(final Notification n) {
final Gson gson = new Gson();
@ -23,12 +32,19 @@ public class NotificationMessage implements Serializable {
final NotificationMessage res = new NotificationMessage();
res.setOriginalId(payload.getResult().getOriginalId());
res.setTopic(n.getTopic());
res.setTrust(payload.getTrust());
res.setMessage(highlightAsMap(payload.getHighlight()));
// TODO Auto-generated method stub
return res;
}
private static Map<String, String> highlightAsMap(final OaBrokerMainEntity hl) {
// TODO Auto-generated method stub
return null;
}
public String getTopic() {
return topic;
}
@ -37,4 +53,28 @@ public class NotificationMessage implements Serializable {
this.topic = topic;
}
public float getTrust() {
return trust;
}
public void setTrust(final float trust) {
this.trust = trust;
}
public Map<String, String> getMessage() {
return message;
}
public void setMessage(final Map<String, String> message) {
this.message = message;
}
public String getOriginalId() {
return originalId;
}
public void setOriginalId(final String originalId) {
this.originalId = originalId;
}
}

View File

@ -1,12 +1,16 @@
package eu.dnetlib.broker.oa.controllers;
import java.io.BufferedOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.lucene.search.join.ScoreMode;
@ -26,6 +30,8 @@ import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.google.gson.Gson;
import eu.dnetlib.broker.BrokerConfiguration;
import eu.dnetlib.broker.common.controllers.AbstractLbsController;
import eu.dnetlib.broker.common.elasticsearch.Notification;
@ -67,7 +73,7 @@ public class OpenairePublicController extends AbstractLbsController {
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.matchQuery("subscriptionId", subscrId))
.withSearchType(SearchType.DEFAULT)
.withFields("payload")
.withFields("topic", "payload")
.build();
final SearchScrollHits<Notification> scroll =
@ -82,7 +88,7 @@ public class OpenairePublicController extends AbstractLbsController {
} else {
log.warn("Invalid subscription: " + subscrId);
return new ScrollPage<>();
return new ScrollPage<>(null, true, new ArrayList<>());
}
}
@ -97,7 +103,7 @@ public class OpenairePublicController extends AbstractLbsController {
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.nestedQuery("map", QueryBuilders.matchQuery("map.targetDatasourceId", dsId), ScoreMode.None))
.withSearchType(SearchType.DEFAULT)
.withFields("payload")
.withFields("topic", "payload")
.build();
final SearchScrollHits<Notification> scroll =
@ -133,6 +139,39 @@ public class OpenairePublicController extends AbstractLbsController {
}
}
@ApiOperation("Returns notifications as file")
@GetMapping("/file/notifications/bySubscriptionId/{subscrId}")
public void notificationsAsFile(final HttpServletResponse res, @PathVariable final String subscrId) throws Exception {
res.setContentType("application/json");
final Gson gson = new Gson();
final BufferedOutputStream out = new BufferedOutputStream(res.getOutputStream());
boolean first = true;
IOUtils.write("[", out);
ScrollPage<NotificationMessage> page = null;
do {
page = page == null ? prepareScrollNotificationsBySubscrId(subscrId) : scrollNotifications(page.getId());
for (final NotificationMessage msg : page.getValues()) {
if (first) {
first = false;
} else {
IOUtils.write(", ", out);
}
IOUtils.write(gson.toJson(msg), out);
}
} while (!page.isCompleted());
IOUtils.write("]", out);
out.flush();
out.close();
}
private List<NotificationMessage> calculateNotificationMessages(final SearchScrollHits<Notification> scroll) {
if (scroll.getSearchHits().size() > 0) {
return scroll.stream()