package eu.dnetlib.broker.oa.controllers; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletResponse; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.index.query.QueryBuilders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Profile; import org.springframework.data.domain.PageRequest; import org.springframework.data.elasticsearch.core.ElasticsearchOperations; import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate; import org.springframework.data.elasticsearch.core.SearchHit; import org.springframework.data.elasticsearch.core.SearchScrollHits; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import com.google.gson.Gson; import eu.dnetlib.broker.BrokerPublicApplication; import eu.dnetlib.broker.api.ShortEventMessage; import eu.dnetlib.broker.common.elasticsearch.EventRepository; import eu.dnetlib.broker.common.elasticsearch.Notification; import eu.dnetlib.broker.common.elasticsearch.NotificationRepository; import eu.dnetlib.broker.common.properties.ElasticSearchProperties; import eu.dnetlib.broker.common.stats.OpenaireDsStatRepository; import eu.dnetlib.broker.common.subscriptions.Subscription; import eu.dnetlib.broker.common.subscriptions.SubscriptionRepository; import eu.dnetlib.broker.objects.OaBrokerEventPayload; import eu.dnetlib.common.controller.AbstractDnetController; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; @Profile("openaire") @RestController @RequestMapping("/") @Tag(name = BrokerPublicApplication.OA_PUBLIC_APIS) public class OpenairePublicController extends AbstractDnetController { @Autowired private ElasticsearchOperations esOperations; @Autowired private EventRepository eventRepository; @Autowired private NotificationRepository notificationRepository; @Autowired private SubscriptionRepository subscriptionRepo; @Autowired private OpenaireDsStatRepository openaireDsStatRepository; @Autowired private ElasticSearchProperties props; @Value("${lbs.hadoop.opendoar.events.path}") private String opendoarEventsPath; private static final long SCROLL_TIMEOUT_IN_MILLIS = 5 * 60 * 1000; private static final Log log = LogFactory.getLog(OpenairePublicController.class); @Operation(summary = "Returns notifications by subscription using scrolls (first page)") @GetMapping("/scroll/notifications/bySubscriptionId/{subscrId}") public ScrollPage prepareScrollNotificationsBySubscrId(@PathVariable final String subscrId) { final Optional optSub = subscriptionRepo.findById(subscrId); if (optSub.isPresent()) { final ElasticsearchRestTemplate esTemplate = (ElasticsearchRestTemplate) esOperations; final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder() .withQuery(QueryBuilders.termQuery("subscriptionId.keyword", subscrId)) .withSearchType(SearchType.DEFAULT) .withFields("topic", "payload", "eventId") .withPageable(PageRequest.of(0, 100)) .build(); final SearchScrollHits scroll = esTemplate.searchScrollStart(SCROLL_TIMEOUT_IN_MILLIS, searchQuery, Notification.class, IndexCoordinates.of(props.getNotificationsIndexName())); if (scroll.hasSearchHits()) { final List values = calculateNotificationMessages(scroll); return new ScrollPage<>(scroll.getScrollId(), values.isEmpty() || scroll.getScrollId() == null, values); } else { esTemplate.searchScrollClear(Arrays.asList(scroll.getScrollId())); return new ScrollPage<>(null, true, new ArrayList<>()); } } else { log.warn("Invalid subscription: " + subscrId); return new ScrollPage<>(null, true, new ArrayList<>()); } } @Operation(summary = "Returns notifications using scrolls (other pages)") @GetMapping("/scroll/notifications/{scrollId}") public ScrollPage scrollNotifications(@PathVariable final String scrollId) { final ElasticsearchRestTemplate esTemplate = (ElasticsearchRestTemplate) esOperations; final SearchScrollHits scroll = esTemplate.searchScrollContinue(scrollId, SCROLL_TIMEOUT_IN_MILLIS, Notification.class, IndexCoordinates.of(props.getNotificationsIndexName())); if (scroll.hasSearchHits()) { final List values = calculateNotificationMessages(scroll); return new ScrollPage<>(scroll.getScrollId(), values.isEmpty() || scroll.getScrollId() == null, values); } else { esTemplate.searchScrollClear(Arrays.asList(scroll.getScrollId())); return new ScrollPage<>(null, true, new ArrayList<>()); } } @Operation(summary = "Returns notifications as file") @GetMapping(value = "/file/notifications/bySubscriptionId/{subscrId}", produces = "application/gzip") public void notificationsAsFile(final HttpServletResponse res, @PathVariable final String subscrId) throws Exception { final Gson gson = new Gson(); res.setHeader("Content-Disposition", "attachment; filename=dump.json.gz"); try (final ServletOutputStream out = res.getOutputStream(); final GZIPOutputStream gzOut = new GZIPOutputStream(out)) { boolean first = true; IOUtils.write("[\n", gzOut, StandardCharsets.UTF_8); ScrollPage page = null; do { page = page == null ? prepareScrollNotificationsBySubscrId(subscrId) : scrollNotifications(page.getId()); for (final ShortEventMessage msg : page.getValues()) { if (first) { first = false; } else { IOUtils.write(",\n", gzOut, StandardCharsets.UTF_8); } IOUtils.write(gson.toJson(msg), gzOut, StandardCharsets.UTF_8); } } while (!page.isCompleted()); IOUtils.write("\n]\n", gzOut, StandardCharsets.UTF_8); gzOut.flush(); } } @Operation(summary = "Returns events as file by opendoarId") @GetMapping(value = "/file/events/opendoar/{id}", produces = "application/gzip") public void opendoarEventsAsFile(final HttpServletResponse res, @PathVariable final String id) { res.setHeader("Content-Disposition", "attachment; filename=dump.json.gz"); final Configuration conf = new Configuration(); conf.addResource(getClass().getResourceAsStream("/core-site.xml")); conf.addResource(getClass().getResourceAsStream("/ocean-hadoop-conf.xml")); final Path pathDir = new Path(opendoarEventsPath + "/" + DigestUtils.md5Hex(id)); try (final FileSystem fs = FileSystem.get(conf); final ServletOutputStream out = res.getOutputStream(); final GZIPOutputStream gzOut = new GZIPOutputStream(out)) { boolean first = true; IOUtils.write("[\n", gzOut, StandardCharsets.UTF_8); try { for (final FileStatus fileStatus : fs.listStatus(pathDir)) { if (fileStatus.isFile()) { final Path path = fileStatus.getPath(); if (path.getName().endsWith(".json")) { try (final FSDataInputStream fis = fs.open(path); final InputStreamReader isr = new InputStreamReader(fis); final BufferedReader br = new BufferedReader(isr)) { first = processLine(gzOut, first, br); } } else if (path.getName().endsWith(".json.gz")) { try (final FSDataInputStream fis = fs.open(path); final GZIPInputStream gzIn = new GZIPInputStream(fis); final InputStreamReader isr = new InputStreamReader(gzIn); final BufferedReader br = new BufferedReader(isr)) { first = processLine(gzOut, first, br); } } } } } catch (final FileNotFoundException e) { log.warn("File not found - " + e.getMessage()); } IOUtils.write("\n]\n", gzOut, StandardCharsets.UTF_8); gzOut.flush(); } catch (final Throwable e) { log.error("Error accessing hdfs file", e); throw new RuntimeException(e); } } private boolean processLine(final GZIPOutputStream gzOut, boolean first, final BufferedReader br) throws IOException { String line = br.readLine(); while (line != null) { if (first) { first = false; } else { IOUtils.write(",\n", gzOut, StandardCharsets.UTF_8); } IOUtils.write(line, gzOut, StandardCharsets.UTF_8); line = br.readLine(); } return first; } @Operation(summary = "Returns the list of subscriptions by user email") @GetMapping(value = "/subscriptions") private Iterable listSubscriptionsByUser(@RequestParam final String email) { return subscriptionRepo.findBySubscriber(email); } @Operation(summary = "Returns the status of the application") @GetMapping(value = "/status") private Map status() { final Map res = new LinkedHashMap<>(); res.put("n_subscriptions", subscriptionRepo.count()); res.put("n_events_es", eventRepository.count()); res.put("n_events_db", openaireDsStatRepository.totalEvents()); res.put("n_notifications", notificationRepository.count()); return res; } @Operation(summary = "Store the feedback of an event (MOCK)") @RequestMapping(value = "/feedback/events", method = { RequestMethod.POST, RequestMethod.PATCH }) private Map feedbackEvent(@RequestBody final EventFeedback feedback) { // TOOD final Map res = new HashMap<>(); res.put("status", "done"); return res; } private List calculateNotificationMessages(final SearchScrollHits scroll) { if (scroll.getSearchHits().size() > 0) { return scroll.stream() .map(SearchHit::getContent) .map(this::messageFromNotification) .collect(Collectors.toList()); } else { return new ArrayList<>(); } } private ShortEventMessage messageFromNotification(final Notification n) { final Gson gson = new Gson(); final OaBrokerEventPayload payload = gson.fromJson(n.getPayload(), OaBrokerEventPayload.class); final ShortEventMessage res = new ShortEventMessage(); res.setEventId(n.getEventId()); res.setOriginalId(payload.getResult().getOriginalId()); res.setTitle(payload.getResult().getTitles().stream().filter(StringUtils::isNotBlank).findFirst().orElse(null)); res.setTopic(n.getTopic()); res.setTrust(payload.getTrust()); res.generateMessageFromObject(payload.getHighlight()); return res; } }