diff --git a/.gitignore b/.gitignore index 80086004..34ca67a7 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,4 @@ spark-warehouse /**/.svn apps/dnet-orgs-database-application/data/ apps/dnet-orgs-database-application/src/main/resources/tmp_data + diff --git a/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/openaire/OpenaireBrokerController.java b/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/openaire/OpenaireBrokerController.java index fff19a27..1fd7e1ab 100644 --- a/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/openaire/OpenaireBrokerController.java +++ b/apps/dhp-broker-application/src/main/java/eu/dnetlib/broker/openaire/OpenaireBrokerController.java @@ -1,7 +1,6 @@ package eu.dnetlib.broker.openaire; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; @@ -24,10 +23,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Profile; import org.springframework.data.domain.PageRequest; import org.springframework.data.elasticsearch.core.ElasticsearchOperations; -import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate; import org.springframework.data.elasticsearch.core.SearchHit; import org.springframework.data.elasticsearch.core.SearchHits; -import org.springframework.data.elasticsearch.core.SearchScrollHits; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; @@ -39,8 +36,6 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; -import com.google.gson.Gson; - import eu.dnetlib.broker.LiteratureBrokerServiceConfiguration; import eu.dnetlib.broker.common.controllers.AbstractLbsController; import eu.dnetlib.broker.common.elasticsearch.Event; @@ -72,8 +67,6 @@ public class OpenaireBrokerController extends AbstractLbsController { @Autowired private ElasticSearchProperties props; - private static final long SCROLL_TIMEOUT_IN_MILLIS = 5 * 60 * 1000; - private static final Log log = LogFactory.getLog(OpenaireBrokerController.class); @ApiOperation("Return the datasources having events") @@ -207,7 +200,7 @@ public class OpenaireBrokerController extends AbstractLbsController { final Subscription sub = optSub.get(); final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder() - .withQuery(QueryBuilders.matchQuery("subscriptionId", subscrId)) + .withQuery(QueryBuilders.termQuery("subscriptionId.keyword", subscrId)) .withSearchType(SearchType.DEFAULT) .withFields("payload") .withPageable(PageRequest.of(nPage, size)) @@ -229,71 +222,6 @@ public class OpenaireBrokerController extends AbstractLbsController { } - @ApiOperation("Returns notifications using scrolls (first page)") - @GetMapping("/scroll/notifications/start/{subscrId}") - public ScrollPage prepareScrollNotifications(@PathVariable final String subscrId) { - - final Optional optSub = subscriptionRepo.findById(subscrId); - - if (optSub.isPresent()) { - - final ElasticsearchRestTemplate esTemplate = (ElasticsearchRestTemplate) esOperations; - - final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder() - .withQuery(QueryBuilders.matchQuery("subscriptionId", subscrId)) - .withSearchType(SearchType.DEFAULT) - .withFields("payload") - .build(); - - final SearchScrollHits scroll = - esTemplate.searchScrollStart(SCROLL_TIMEOUT_IN_MILLIS, searchQuery, Notification.class, IndexCoordinates.of(props.getNotificationsIndexName())); - if (scroll.hasSearchHits()) { - final List values = calculateEventPayloads(scroll); - return new ScrollPage(scroll.getScrollId(), values.isEmpty() || scroll.getScrollId() == null, values); - } else { - esTemplate.searchScrollClear(Arrays.asList(scroll.getScrollId())); - return new ScrollPage(null, true, new ArrayList<>()); - } - - } else { - log.warn("Invalid subscription: " + subscrId); - return new ScrollPage(); - } - - } - - @ApiOperation("Returns notifications using scrolls (other pages)") - @GetMapping("/scroll/notifications/{scrollId}") - public ScrollPage scrollNotifications(@PathVariable final String scrollId) { - - final ElasticsearchRestTemplate esTemplate = (ElasticsearchRestTemplate) esOperations; - - final SearchScrollHits scroll = - esTemplate.searchScrollContinue(scrollId, SCROLL_TIMEOUT_IN_MILLIS, Notification.class, IndexCoordinates.of(props.getNotificationsIndexName())); - if (scroll.hasSearchHits()) { - final List values = calculateEventPayloads(scroll); - return new ScrollPage(scroll.getScrollId(), values.isEmpty() || scroll.getScrollId() == null, values); - } else { - esTemplate.searchScrollClear(Arrays.asList(scroll.getScrollId())); - return new ScrollPage(null, true, new ArrayList<>()); - } - } - - private List calculateEventPayloads(final SearchScrollHits scroll) { - if (scroll.getSearchHits().size() > 0) { - final Gson gson = new Gson(); - - return scroll.stream() - .map(SearchHit::getContent) - .map(Notification::getPayload) - .map(s -> gson.fromJson(s, OaBrokerEventPayload.class)) - .collect(Collectors.toList()); - } else { - return new ArrayList<>(); - } - - } - private SimpleSubscriptionDesc subscriptionDesc(final Subscription s) { return new SimpleSubscriptionDesc(s.getSubscriptionId(), extractDatasource(s), s.getTopic(), s.getCreationDate(), s.getLastNotificationDate(), OpenaireBrokerController.this.notificationRepository.countBySubscriptionId(s.getSubscriptionId())); diff --git a/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/oa/controllers/NotificationMessage.java b/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/oa/controllers/NotificationMessage.java index 2410829d..8f9edfed 100644 --- a/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/oa/controllers/NotificationMessage.java +++ b/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/oa/controllers/NotificationMessage.java @@ -1,14 +1,21 @@ package eu.dnetlib.broker.oa.controllers; +import java.beans.Introspector; +import java.beans.PropertyDescriptor; import java.io.Serializable; +import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import com.google.gson.Gson; import eu.dnetlib.broker.common.elasticsearch.Notification; import eu.dnetlib.broker.objects.OaBrokerEventPayload; -import eu.dnetlib.broker.objects.OaBrokerMainEntity; public class NotificationMessage implements Serializable { @@ -19,12 +26,16 @@ public class NotificationMessage implements Serializable { private String originalId; + private String title; + private String topic; private float trust; private Map message = new LinkedHashMap<>(); + private static final Log log = LogFactory.getLog(NotificationMessage.class); + public static NotificationMessage fromNotification(final Notification n) { final Gson gson = new Gson(); @@ -33,16 +44,66 @@ public class NotificationMessage implements Serializable { final NotificationMessage res = new NotificationMessage(); res.setOriginalId(payload.getResult().getOriginalId()); + res.setTitle(payload.getResult().getTitles().stream().filter(StringUtils::isNotBlank).findFirst().orElse(null)); res.setTopic(n.getTopic()); res.setTrust(payload.getTrust()); - res.setMessage(highlightAsMap(payload.getHighlight())); + res.setMessage(highlightAsMap("", payload.getHighlight())); return res; } - private static Map highlightAsMap(final OaBrokerMainEntity hl) { - // TODO Auto-generated method stub - return null; + private static Map highlightAsMap(final String prefix, final Object bean) { + + final Map res = new LinkedHashMap<>(); + + try { + for (final PropertyDescriptor pd : Introspector.getBeanInfo(bean.getClass(), Object.class).getPropertyDescriptors()) { + if (pd.getReadMethod() != null) { + final Object v = pd.getReadMethod().invoke(bean); + if (v != null) { + if (v instanceof List && !((List) v).isEmpty()) { + final List list = (List) v; + for (int i = 0; i < list.size(); i++) { + final Object x = list.get(i); + if (x instanceof String && StringUtils.isNotBlank(x.toString())) { + res.put(prefix + pd.getName() + "[" + i + "]", x.toString()); + } else { + res.putAll(highlightAsMap(prefix + pd.getName() + "[" + i + "].", x)); + } + } + } else if (v instanceof String) { + res.put(prefix + pd.getName(), v.toString()); + } else { + res.putAll(highlightAsMap(pd.getName() + ".", v)); + } + } + } + + } + + return res; + + } catch (final Exception e) { + log.warn(e); + return Collections.emptyMap(); + } + + } + + public String getOriginalId() { + return originalId; + } + + public void setOriginalId(final String originalId) { + this.originalId = originalId; + } + + public String getTitle() { + return title; + } + + public void setTitle(final String title) { + this.title = title; } public String getTopic() { @@ -69,12 +130,4 @@ public class NotificationMessage implements Serializable { this.message = message; } - public String getOriginalId() { - return originalId; - } - - public void setOriginalId(final String originalId) { - this.originalId = originalId; - } - } diff --git a/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/oa/controllers/OpenairePublicController.java b/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/oa/controllers/OpenairePublicController.java index 2e72fa98..1f9662c9 100644 --- a/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/oa/controllers/OpenairePublicController.java +++ b/apps/dhp-broker-public-application/src/main/java/eu/dnetlib/broker/oa/controllers/OpenairePublicController.java @@ -1,12 +1,13 @@ package eu.dnetlib.broker.oa.controllers; -import java.io.BufferedOutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import java.util.zip.GZIPOutputStream; +import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletResponse; import org.apache.commons.codec.digest.DigestUtils; @@ -18,6 +19,7 @@ import org.elasticsearch.action.search.SearchType; import org.elasticsearch.index.query.QueryBuilders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Profile; +import org.springframework.data.domain.PageRequest; import org.springframework.data.elasticsearch.core.ElasticsearchOperations; import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate; import org.springframework.data.elasticsearch.core.SearchHit; @@ -43,7 +45,7 @@ import io.swagger.annotations.ApiOperation; @Profile("openaire") @RestController -@RequestMapping("/api/openaireBroker") +@RequestMapping("/") @Api(tags = BrokerConfiguration.TAG_OPENAIRE) public class OpenairePublicController extends AbstractLbsController { @@ -71,9 +73,10 @@ public class OpenairePublicController extends AbstractLbsController { final ElasticsearchRestTemplate esTemplate = (ElasticsearchRestTemplate) esOperations; final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder() - .withQuery(QueryBuilders.matchQuery("subscriptionId", subscrId)) + .withQuery(QueryBuilders.termQuery("subscriptionId.keyword", subscrId)) .withSearchType(SearchType.DEFAULT) .withFields("topic", "payload") + .withPageable(PageRequest.of(0, 100)) .build(); final SearchScrollHits scroll = @@ -101,7 +104,7 @@ public class OpenairePublicController extends AbstractLbsController { final String dsId = calculateDsIdFromOpenDoarId(opendoarId); final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder() - .withQuery(QueryBuilders.nestedQuery("map", QueryBuilders.matchQuery("map.targetDatasourceId", dsId), ScoreMode.None)) + .withQuery(QueryBuilders.nestedQuery("map", QueryBuilders.termQuery("map.targetDatasourceId", dsId), ScoreMode.None)) .withSearchType(SearchType.DEFAULT) .withFields("topic", "payload") .build(); @@ -140,36 +143,40 @@ public class OpenairePublicController extends AbstractLbsController { } @ApiOperation("Returns notifications as file") - @GetMapping("/file/notifications/bySubscriptionId/{subscrId}") + @GetMapping(value = "/file/notifications/bySubscriptionId/{subscrId}", produces = "application/gzip") public void notificationsAsFile(final HttpServletResponse res, @PathVariable final String subscrId) throws Exception { - res.setContentType("application/json"); final Gson gson = new Gson(); - final BufferedOutputStream out = new BufferedOutputStream(res.getOutputStream()); + res.setHeader("Content-Disposition", "attachment; filename=dump.json.gz"); - boolean first = true; + try (final ServletOutputStream out = res.getOutputStream(); final GZIPOutputStream gzOut = new GZIPOutputStream(out)) { - IOUtils.write("[", out); + boolean first = true; - ScrollPage page = null; + IOUtils.write("[", gzOut); - do { - page = page == null ? prepareScrollNotificationsBySubscrId(subscrId) : scrollNotifications(page.getId()); - for (final NotificationMessage msg : page.getValues()) { - if (first) { - first = false; - } else { - IOUtils.write(", ", out); + ScrollPage page = null; + + do { + page = page == null ? prepareScrollNotificationsBySubscrId(subscrId) : scrollNotifications(page.getId()); + + for (final NotificationMessage msg : page.getValues()) { + if (first) { + first = false; + } else { + IOUtils.write(", ", gzOut); + } + IOUtils.write(gson.toJson(msg), gzOut); } - IOUtils.write(gson.toJson(msg), out); - } - } while (!page.isCompleted()); + } while (!page.isCompleted()); - IOUtils.write("]", out); + IOUtils.write("]", gzOut); + + gzOut.flush(); + + } - out.flush(); - out.close(); } private List calculateNotificationMessages(final SearchScrollHits scroll) {