127 lines
4.8 KiB
Java
127 lines
4.8 KiB
Java
package eu.dnetlib.lbs.controllers;
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.Arrays;
|
|
import java.util.List;
|
|
import java.util.Optional;
|
|
import java.util.stream.Collectors;
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.elasticsearch.action.search.SearchType;
|
|
import org.elasticsearch.index.query.QueryBuilders;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.context.annotation.Profile;
|
|
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
|
|
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
|
|
import org.springframework.data.elasticsearch.core.SearchHit;
|
|
import org.springframework.data.elasticsearch.core.SearchScrollHits;
|
|
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
|
|
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
|
|
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
|
|
import org.springframework.web.bind.annotation.GetMapping;
|
|
import org.springframework.web.bind.annotation.PathVariable;
|
|
import org.springframework.web.bind.annotation.RequestMapping;
|
|
import org.springframework.web.bind.annotation.RestController;
|
|
|
|
import com.google.gson.Gson;
|
|
|
|
import eu.dnetlib.broker.objects.OaBrokerEventPayload;
|
|
import eu.dnetlib.lbs.BrokerConfiguration;
|
|
import eu.dnetlib.lbs.elasticsearch.Notification;
|
|
import eu.dnetlib.lbs.elasticsearch.NotificationRepository;
|
|
import eu.dnetlib.lbs.properties.ElasticSearchProperties;
|
|
import eu.dnetlib.lbs.subscriptions.Subscription;
|
|
import eu.dnetlib.lbs.subscriptions.SubscriptionRepository;
|
|
import io.swagger.annotations.Api;
|
|
import io.swagger.annotations.ApiOperation;
|
|
|
|
@Profile("openaire")
|
|
@RestController
|
|
@RequestMapping("/api/openaireBroker")
|
|
@Api(tags = BrokerConfiguration.TAG_OPENAIRE)
|
|
public class OpenaireBrokerController extends AbstractLbsController {
|
|
|
|
@Autowired
|
|
private ElasticsearchOperations esOperations;
|
|
|
|
@Autowired
|
|
private NotificationRepository notificationRepository;
|
|
|
|
@Autowired
|
|
private SubscriptionRepository subscriptionRepo;
|
|
|
|
@Autowired
|
|
private ElasticSearchProperties props;
|
|
|
|
private static final long SCROLL_TIMEOUT_IN_MILLIS = 5 * 60 * 1000;
|
|
|
|
private static final Log log = LogFactory.getLog(OpenaireBrokerController.class);
|
|
|
|
@ApiOperation("Returns notifications using scrolls (first page)")
|
|
@GetMapping("/scroll/notifications/start/{subscrId}")
|
|
public ScrollPage prepareScrollNotifications(@PathVariable final String subscrId) {
|
|
|
|
final Optional<Subscription> optSub = subscriptionRepo.findById(subscrId);
|
|
|
|
if (optSub.isPresent()) {
|
|
|
|
final ElasticsearchRestTemplate esTemplate = (ElasticsearchRestTemplate) esOperations;
|
|
|
|
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
|
|
.withQuery(QueryBuilders.matchQuery("subscriptionId", subscrId))
|
|
.withSearchType(SearchType.DEFAULT)
|
|
.withFields("payload")
|
|
.build();
|
|
|
|
final SearchScrollHits<Notification> scroll =
|
|
esTemplate.searchScrollStart(SCROLL_TIMEOUT_IN_MILLIS, searchQuery, Notification.class, IndexCoordinates.of(props.getNotificationsIndexName()));
|
|
if (scroll.hasSearchHits()) {
|
|
final List<OaBrokerEventPayload> values = calculateEventPayloads(scroll);
|
|
return new ScrollPage(scroll.getScrollId(), values.isEmpty() || scroll.getScrollId() == null, values);
|
|
} else {
|
|
esTemplate.searchScrollClear(Arrays.asList(scroll.getScrollId()));
|
|
return new ScrollPage(null, true, new ArrayList<>());
|
|
}
|
|
|
|
} else {
|
|
log.warn("Invalid subscription: " + subscrId);
|
|
return new ScrollPage();
|
|
}
|
|
|
|
}
|
|
|
|
@ApiOperation("Returns notifications using scrolls (other pages)")
|
|
@GetMapping("/scroll/notifications/{scrollId}")
|
|
public ScrollPage scrollNotifications(@PathVariable final String scrollId) {
|
|
|
|
final ElasticsearchRestTemplate esTemplate = (ElasticsearchRestTemplate) esOperations;
|
|
|
|
final SearchScrollHits<Notification> scroll =
|
|
esTemplate.searchScrollContinue(scrollId, SCROLL_TIMEOUT_IN_MILLIS, Notification.class, IndexCoordinates.of(props.getNotificationsIndexName()));
|
|
if (scroll.hasSearchHits()) {
|
|
final List<OaBrokerEventPayload> values = calculateEventPayloads(scroll);
|
|
return new ScrollPage(scroll.getScrollId(), values.isEmpty() || scroll.getScrollId() == null, values);
|
|
} else {
|
|
esTemplate.searchScrollClear(Arrays.asList(scroll.getScrollId()));
|
|
return new ScrollPage(null, true, new ArrayList<>());
|
|
}
|
|
}
|
|
|
|
private List<OaBrokerEventPayload> calculateEventPayloads(final SearchScrollHits<Notification> scroll) {
|
|
if (scroll.getSearchHits().size() > 0) {
|
|
final Gson gson = new Gson();
|
|
|
|
return scroll.stream()
|
|
.map(SearchHit::getContent)
|
|
.map(Notification::getPayload)
|
|
.map(s -> gson.fromJson(s, OaBrokerEventPayload.class))
|
|
.collect(Collectors.toList());
|
|
} else {
|
|
return new ArrayList<>();
|
|
}
|
|
|
|
}
|
|
|
|
}
|