package eu.dnetlib.broker.oa.controllers; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import java.util.zip.GZIPOutputStream; import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletResponse; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.lucene.search.join.ScoreMode; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.index.query.QueryBuilders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Profile; import org.springframework.data.domain.PageRequest; import org.springframework.data.elasticsearch.core.ElasticsearchOperations; import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate; import org.springframework.data.elasticsearch.core.SearchHit; import org.springframework.data.elasticsearch.core.SearchScrollHits; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.google.gson.Gson; import eu.dnetlib.broker.BrokerConfiguration; import eu.dnetlib.broker.api.ShortEventMessage; import eu.dnetlib.broker.common.controllers.AbstractLbsController; import eu.dnetlib.broker.common.elasticsearch.Notification; import eu.dnetlib.broker.common.properties.ElasticSearchProperties; import eu.dnetlib.broker.common.subscriptions.Subscription; import eu.dnetlib.broker.common.subscriptions.SubscriptionRepository; import eu.dnetlib.broker.objects.OaBrokerEventPayload; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @Profile("openaire") @RestController @RequestMapping("/") @Api(tags = BrokerConfiguration.TAG_OPENAIRE) public class OpenairePublicController extends AbstractLbsController { @Autowired private ElasticsearchOperations esOperations; @Autowired private SubscriptionRepository subscriptionRepo; @Autowired private ElasticSearchProperties props; private static final long SCROLL_TIMEOUT_IN_MILLIS = 5 * 60 * 1000; private static final Log log = LogFactory.getLog(OpenairePublicController.class); @ApiOperation("Returns notifications by subscription using scrolls (first page)") @GetMapping("/scroll/notifications/bySubscriptionId/{subscrId}") public ScrollPage prepareScrollNotificationsBySubscrId(@PathVariable final String subscrId) { final Optional optSub = subscriptionRepo.findById(subscrId); if (optSub.isPresent()) { final ElasticsearchRestTemplate esTemplate = (ElasticsearchRestTemplate) esOperations; final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder() .withQuery(QueryBuilders.termQuery("subscriptionId.keyword", subscrId)) .withSearchType(SearchType.DEFAULT) .withFields("topic", "payload") .withPageable(PageRequest.of(0, 100)) .build(); final SearchScrollHits scroll = esTemplate.searchScrollStart(SCROLL_TIMEOUT_IN_MILLIS, searchQuery, Notification.class, IndexCoordinates.of(props.getNotificationsIndexName())); if (scroll.hasSearchHits()) { final List values = calculateNotificationMessages(scroll); return new ScrollPage<>(scroll.getScrollId(), values.isEmpty() || scroll.getScrollId() == null, values); } else { esTemplate.searchScrollClear(Arrays.asList(scroll.getScrollId())); return new ScrollPage<>(null, true, new ArrayList<>()); } } else { log.warn("Invalid subscription: " + subscrId); return new ScrollPage<>(null, true, new ArrayList<>()); } } @ApiOperation("Returns notifications by opendorar Id (for example: 301) using scrolls (first page)") @GetMapping("/scroll/notifications/byOpenDoarId/{opendoarId}") public ScrollPage prepareScrollNotificationsByOpendoarId(@PathVariable final String opendoarId) { final ElasticsearchRestTemplate esTemplate = (ElasticsearchRestTemplate) esOperations; final String dsId = calculateDsIdFromOpenDoarId(opendoarId); final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder() .withQuery(QueryBuilders.nestedQuery("map", QueryBuilders.termQuery("map.targetDatasourceId", dsId), ScoreMode.None)) .withSearchType(SearchType.DEFAULT) .withFields("topic", "payload") .build(); final SearchScrollHits scroll = esTemplate.searchScrollStart(SCROLL_TIMEOUT_IN_MILLIS, searchQuery, Notification.class, IndexCoordinates.of(props.getNotificationsIndexName())); if (scroll.hasSearchHits()) { final List values = calculateNotificationMessages(scroll); return new ScrollPage<>(scroll.getScrollId(), values.isEmpty() || scroll.getScrollId() == null, values); } else { esTemplate.searchScrollClear(Arrays.asList(scroll.getScrollId())); return new ScrollPage<>(null, true, new ArrayList<>()); } } private String calculateDsIdFromOpenDoarId(final String opendoarId) { return "10|opendoar____::" + DigestUtils.md5Hex(opendoarId); } @ApiOperation("Returns notifications using scrolls (other pages)") @GetMapping("/scroll/notifications/{scrollId}") public ScrollPage scrollNotifications(@PathVariable final String scrollId) { final ElasticsearchRestTemplate esTemplate = (ElasticsearchRestTemplate) esOperations; final SearchScrollHits scroll = esTemplate.searchScrollContinue(scrollId, SCROLL_TIMEOUT_IN_MILLIS, Notification.class, IndexCoordinates.of(props.getNotificationsIndexName())); if (scroll.hasSearchHits()) { final List values = calculateNotificationMessages(scroll); return new ScrollPage<>(scroll.getScrollId(), values.isEmpty() || scroll.getScrollId() == null, values); } else { esTemplate.searchScrollClear(Arrays.asList(scroll.getScrollId())); return new ScrollPage<>(null, true, new ArrayList<>()); } } @ApiOperation("Returns notifications as file") @GetMapping(value = "/file/notifications/bySubscriptionId/{subscrId}", produces = "application/gzip") public void notificationsAsFile(final HttpServletResponse res, @PathVariable final String subscrId) throws Exception { final Gson gson = new Gson(); res.setHeader("Content-Disposition", "attachment; filename=dump.json.gz"); try (final ServletOutputStream out = res.getOutputStream(); final GZIPOutputStream gzOut = new GZIPOutputStream(out)) { boolean first = true; IOUtils.write("[\n", gzOut); ScrollPage page = null; do { page = page == null ? prepareScrollNotificationsBySubscrId(subscrId) : scrollNotifications(page.getId()); for (final ShortEventMessage msg : page.getValues()) { if (first) { first = false; } else { IOUtils.write(",\n", gzOut); } IOUtils.write(gson.toJson(msg), gzOut); } } while (!page.isCompleted()); IOUtils.write("\n]\n", gzOut); gzOut.flush(); } } private List calculateNotificationMessages(final SearchScrollHits scroll) { if (scroll.getSearchHits().size() > 0) { return scroll.stream() .map(SearchHit::getContent) .map(this::messageFromNotification) .collect(Collectors.toList()); } else { return new ArrayList<>(); } } private ShortEventMessage messageFromNotification(final Notification n) { final Gson gson = new Gson(); final OaBrokerEventPayload payload = gson.fromJson(n.getPayload(), OaBrokerEventPayload.class); final ShortEventMessage res = new ShortEventMessage(); res.setOriginalId(payload.getResult().getOriginalId()); res.setTitle(payload.getResult().getTitles().stream().filter(StringUtils::isNotBlank).findFirst().orElse(null)); res.setTopic(n.getTopic()); res.setTrust(payload.getTrust()); res.generateMessageFromObject(payload.getHighlight()); return res; } }