dnet-applications/apps/dhp-broker-application/src/main/java/eu/dnetlib/lbs/openaire/OpenaireBrokerController.java

316 lines
13 KiB
Java

package eu.dnetlib.lbs.openaire;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.lucene.search.join.ScoreMode;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.nested.ParsedNested;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.SearchScrollHits;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.google.gson.Gson;
import eu.dnetlib.broker.objects.OaBrokerEventPayload;
import eu.dnetlib.lbs.LiteratureBrokerServiceConfiguration;
import eu.dnetlib.lbs.controllers.AbstractLbsController;
import eu.dnetlib.lbs.elasticsearch.Event;
import eu.dnetlib.lbs.elasticsearch.Notification;
import eu.dnetlib.lbs.elasticsearch.NotificationRepository;
import eu.dnetlib.lbs.properties.ElasticSearchProperties;
import eu.dnetlib.lbs.subscriptions.MapCondition;
import eu.dnetlib.lbs.subscriptions.Subscription;
import eu.dnetlib.lbs.subscriptions.SubscriptionRepository;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
@RestController
@RequestMapping("/api/openaireBroker")
@Api(tags = LiteratureBrokerServiceConfiguration.TAG_OPENAIRE)
public class OpenaireBrokerController extends AbstractLbsController {
@Autowired
private ElasticsearchOperations esOperations;
@Autowired
private NotificationRepository notificationRepository;
@Autowired
private SubscriptionRepository subscriptionRepo;
@Autowired
private ElasticSearchProperties props;
private static final long SCROLL_TIMEOUT_IN_MILLIS = 5 * 60 * 1000;
private static final Log log = LogFactory.getLog(OpenaireBrokerController.class);
@ApiOperation("Return the datasources having events")
@GetMapping("/datasources")
public List<BrowseEntry> findDatasourcesWithEvents() {
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.matchAllQuery())
.withSearchType(SearchType.DEFAULT)
.addAggregation(AggregationBuilders.nested("nested", "map")
// .path("map")
.subAggregation(AggregationBuilders.terms("by_map").field("map.targetDatasourceName").size(1000).minDocCount(1)))
.build();
final SearchHits<Event> hits = esOperations.search(searchQuery, Event.class, IndexCoordinates.of(props.getEventsIndexName()));
final Aggregations aggregations = hits.getAggregations();
final Aggregation aggByMap = ((ParsedNested) aggregations.asMap().get("nested")).getAggregations().asMap().get("by_map");
return ((ParsedStringTerms) aggByMap).getBuckets()
.stream()
.map(b -> new BrowseEntry(b.getKeyAsString(), b.getDocCount()))
.collect(Collectors.toList());
}
@ApiOperation("Return the topics of the events of a datasource")
@GetMapping("/topicsForDatasource")
public List<BrowseEntry> findTopicsForDatasource(@RequestParam final String ds) {
final String term = "topic.keyword";
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.nestedQuery("map", QueryBuilders.matchQuery("map.targetDatasourceName", ds), ScoreMode.None))
.withSearchType(SearchType.DEFAULT)
.addAggregation(AggregationBuilders.terms(term).field(term).size(1000).minDocCount(1))
.build();
final SearchHits<Event> hits = esOperations.search(searchQuery, Event.class, IndexCoordinates.of(props.getEventsIndexName()));
final Aggregations aggregations = hits.getAggregations();
return ((ParsedStringTerms) aggregations.asMap().get(term)).getBuckets()
.stream()
.map(b -> new BrowseEntry(b.getKeyAsString(), b.getDocCount()))
.collect(Collectors.toList());
}
@ApiOperation("Return a page of events of a datasource (by topic)")
@GetMapping("/events/{nPage}/{size}")
public EventsPage showEvents(@RequestParam final String ds, @RequestParam final String topic, @PathVariable final int nPage, @PathVariable final int size) {
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("topic", topic).operator(Operator.AND))
.must(QueryBuilders.nestedQuery("map", QueryBuilders.matchQuery("map.targetDatasourceName", ds), ScoreMode.None)))
.withSearchType(SearchType.DEFAULT)
.withFields("payload")
.withPageable(PageRequest.of(nPage, size))
.build();
final SearchHits<Event> page = esOperations.search(searchQuery, Event.class, IndexCoordinates.of(props.getEventsIndexName()));
final List<OaBrokerEventPayload> list = page.stream()
.map(SearchHit::getContent)
.map(Event::getPayload)
.map(OaBrokerEventPayload::fromJSON)
.collect(Collectors.toList());
return new EventsPage(ds, topic, nPage, overrideGetTotalPage(page, size), page.getTotalHits(), list);
}
@ApiOperation("Return a page of events of a datasource (by query)")
@PostMapping("/events/{nPage}/{size}")
public EventsPage advancedShowEvents(@PathVariable final int nPage, @PathVariable final int size, @RequestBody final AdvQueryObject qObj) {
final BoolQueryBuilder mapQuery = QueryBuilders.boolQuery();
ElasticSearchQueryUtils.addMapCondition(mapQuery, "map.targetDatasourceName", qObj.getDatasource());
ElasticSearchQueryUtils.addMapCondition(mapQuery, "map.targetResultTitle", qObj.getTitles());
ElasticSearchQueryUtils.addMapCondition(mapQuery, "map.targetAuthors", qObj.getAuthors());
ElasticSearchQueryUtils.addMapCondition(mapQuery, "map.targetSubjects", qObj.getSubjects());
ElasticSearchQueryUtils.addMapConditionForTrust(mapQuery, "map.trust", qObj.getTrust());
ElasticSearchQueryUtils.addMapConditionForDates(mapQuery, "map.targetDateofacceptance", qObj.getDates());
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("topic", qObj.getTopic()).operator(Operator.AND))
.must(QueryBuilders.nestedQuery("map", mapQuery, ScoreMode.None)))
.withSearchType(SearchType.DEFAULT)
.withFields("payload")
.withPageable(PageRequest.of(nPage, size))
.build();
final SearchHits<Event> page = esOperations.search(searchQuery, Event.class, IndexCoordinates.of(props.getEventsIndexName()));
final List<OaBrokerEventPayload> list = page.stream()
.map(SearchHit::getContent)
.map(Event::getPayload)
.map(OaBrokerEventPayload::fromJSON)
.collect(Collectors.toList());
return new EventsPage(qObj.getDatasource(), qObj.getTopic(), nPage, overrideGetTotalPage(page, size), page.getTotalHits(), list);
}
@ApiOperation("Perform a subscription")
@PostMapping("/subscribe")
public Subscription registerSubscription(@RequestBody final OpenaireSubscription oSub) {
final Subscription sub = oSub.asSubscription();
subscriptionRepo.save(sub);
return sub;
}
@ApiOperation("Return the subscriptions of an user (by email)")
@GetMapping("/subscriptions")
public Map<String, List<SimpleSubscriptionDesc>> subscriptions(@RequestParam final String email) {
final Iterable<Subscription> iter = subscriptionRepo.findBySubscriber(email);
return StreamSupport.stream(iter.spliterator(), false)
.map(this::subscriptionDesc)
.collect(Collectors.groupingBy(SimpleSubscriptionDesc::getDatasource));
}
@ApiOperation("Return a page of notifications")
@GetMapping("/notifications/{subscrId}/{nPage}/{size}")
public EventsPage notifications(@PathVariable final String subscrId, @PathVariable final int nPage, @PathVariable final int size) {
final Optional<Subscription> optSub = subscriptionRepo.findById(subscrId);
if (optSub.isPresent()) {
final Subscription sub = optSub.get();
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.matchQuery("subscriptionId", subscrId))
.withSearchType(SearchType.DEFAULT)
.withFields("payload")
.withPageable(PageRequest.of(nPage, size))
.build();
final SearchHits<Notification> page = esOperations.search(searchQuery, Notification.class, IndexCoordinates.of(props.getNotificationsIndexName()));
final List<OaBrokerEventPayload> list = page.stream()
.map(SearchHit::getContent)
.map(Notification::getPayload)
.map(OaBrokerEventPayload::fromJSON)
.collect(Collectors.toList());
return new EventsPage(extractDatasource(sub), sub.getTopic(), nPage, overrideGetTotalPage(page, size), page.getTotalHits(), list);
} else {
log.warn("Invalid subscription: " + subscrId);
return new EventsPage("", "", nPage, 0, 0, new ArrayList<>());
}
}
@ApiOperation("Returns notifications using scrolls (first page)")
@GetMapping("/scroll/notifications/start/{subscrId}")
public ScrollPage prepareScrollNotifications(@PathVariable final String subscrId) {
final Optional<Subscription> optSub = subscriptionRepo.findById(subscrId);
if (optSub.isPresent()) {
final ElasticsearchRestTemplate esTemplate = (ElasticsearchRestTemplate) esOperations;
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.matchQuery("subscriptionId", subscrId))
.withSearchType(SearchType.DEFAULT)
.withFields("payload")
.build();
final SearchScrollHits<Notification> scroll =
esTemplate.searchScrollStart(SCROLL_TIMEOUT_IN_MILLIS, searchQuery, Notification.class, IndexCoordinates.of(props.getNotificationsIndexName()));
if (scroll.hasSearchHits()) {
final List<OaBrokerEventPayload> values = calculateEventPayloads(scroll);
return new ScrollPage(scroll.getScrollId(), values.isEmpty() || scroll.getScrollId() == null, values);
} else {
esTemplate.searchScrollClear(Arrays.asList(scroll.getScrollId()));
return new ScrollPage(null, true, new ArrayList<>());
}
} else {
log.warn("Invalid subscription: " + subscrId);
return new ScrollPage();
}
}
@ApiOperation("Returns notifications using scrolls (other pages)")
@GetMapping("/scroll/notifications/{scrollId}")
public ScrollPage scrollNotifications(@PathVariable final String scrollId) {
final ElasticsearchRestTemplate esTemplate = (ElasticsearchRestTemplate) esOperations;
final SearchScrollHits<Notification> scroll =
esTemplate.searchScrollContinue(scrollId, SCROLL_TIMEOUT_IN_MILLIS, Notification.class, IndexCoordinates.of(props.getNotificationsIndexName()));
if (scroll.hasSearchHits()) {
final List<OaBrokerEventPayload> values = calculateEventPayloads(scroll);
return new ScrollPage(scroll.getScrollId(), values.isEmpty() || scroll.getScrollId() == null, values);
} else {
esTemplate.searchScrollClear(Arrays.asList(scroll.getScrollId()));
return new ScrollPage(null, true, new ArrayList<>());
}
}
private List<OaBrokerEventPayload> calculateEventPayloads(final SearchScrollHits<Notification> scroll) {
if (scroll.getSearchHits().size() > 0) {
final Gson gson = new Gson();
return scroll.stream()
.map(SearchHit::getContent)
.map(Notification::getPayload)
.map(s -> gson.fromJson(s, OaBrokerEventPayload.class))
.collect(Collectors.toList());
} else {
return new ArrayList<>();
}
}
private SimpleSubscriptionDesc subscriptionDesc(final Subscription s) {
return new SimpleSubscriptionDesc(s.getSubscriptionId(), extractDatasource(s), s.getTopic(), s.getCreationDate(), s.getLastNotificationDate(),
OpenaireBrokerController.this.notificationRepository.countBySubscriptionId(s.getSubscriptionId()));
}
private String extractDatasource(final Subscription sub) {
return sub.getConditionsAsList()
.stream()
.filter(c -> c.getField().equals("targetDatasourceName"))
.map(MapCondition::getListParams)
.filter(l -> !l.isEmpty())
.map(l -> l.get(0).getValue())
.findFirst()
.get();
}
private long overrideGetTotalPage(final SearchHits<?> page, final int size) {
return (page.getTotalHits() + size - 1) / size;
}
}