public apis
This commit is contained in:
parent
81218aac97
commit
d0f0231de8
|
@ -28,3 +28,4 @@ spark-warehouse
|
||||||
/**/.svn
|
/**/.svn
|
||||||
apps/dnet-orgs-database-application/data/
|
apps/dnet-orgs-database-application/data/
|
||||||
apps/dnet-orgs-database-application/src/main/resources/tmp_data
|
apps/dnet-orgs-database-application/src/main/resources/tmp_data
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package eu.dnetlib.broker.openaire;
|
package eu.dnetlib.broker.openaire;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
@ -24,10 +23,8 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.context.annotation.Profile;
|
import org.springframework.context.annotation.Profile;
|
||||||
import org.springframework.data.domain.PageRequest;
|
import org.springframework.data.domain.PageRequest;
|
||||||
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
|
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
|
||||||
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
|
|
||||||
import org.springframework.data.elasticsearch.core.SearchHit;
|
import org.springframework.data.elasticsearch.core.SearchHit;
|
||||||
import org.springframework.data.elasticsearch.core.SearchHits;
|
import org.springframework.data.elasticsearch.core.SearchHits;
|
||||||
import org.springframework.data.elasticsearch.core.SearchScrollHits;
|
|
||||||
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
|
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
|
||||||
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
|
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
|
||||||
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
|
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
|
||||||
|
@ -39,8 +36,6 @@ import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
import org.springframework.web.bind.annotation.RequestParam;
|
import org.springframework.web.bind.annotation.RequestParam;
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
import com.google.gson.Gson;
|
|
||||||
|
|
||||||
import eu.dnetlib.broker.LiteratureBrokerServiceConfiguration;
|
import eu.dnetlib.broker.LiteratureBrokerServiceConfiguration;
|
||||||
import eu.dnetlib.broker.common.controllers.AbstractLbsController;
|
import eu.dnetlib.broker.common.controllers.AbstractLbsController;
|
||||||
import eu.dnetlib.broker.common.elasticsearch.Event;
|
import eu.dnetlib.broker.common.elasticsearch.Event;
|
||||||
|
@ -72,8 +67,6 @@ public class OpenaireBrokerController extends AbstractLbsController {
|
||||||
@Autowired
|
@Autowired
|
||||||
private ElasticSearchProperties props;
|
private ElasticSearchProperties props;
|
||||||
|
|
||||||
private static final long SCROLL_TIMEOUT_IN_MILLIS = 5 * 60 * 1000;
|
|
||||||
|
|
||||||
private static final Log log = LogFactory.getLog(OpenaireBrokerController.class);
|
private static final Log log = LogFactory.getLog(OpenaireBrokerController.class);
|
||||||
|
|
||||||
@ApiOperation("Return the datasources having events")
|
@ApiOperation("Return the datasources having events")
|
||||||
|
@ -207,7 +200,7 @@ public class OpenaireBrokerController extends AbstractLbsController {
|
||||||
final Subscription sub = optSub.get();
|
final Subscription sub = optSub.get();
|
||||||
|
|
||||||
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
|
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
|
||||||
.withQuery(QueryBuilders.matchQuery("subscriptionId", subscrId))
|
.withQuery(QueryBuilders.termQuery("subscriptionId.keyword", subscrId))
|
||||||
.withSearchType(SearchType.DEFAULT)
|
.withSearchType(SearchType.DEFAULT)
|
||||||
.withFields("payload")
|
.withFields("payload")
|
||||||
.withPageable(PageRequest.of(nPage, size))
|
.withPageable(PageRequest.of(nPage, size))
|
||||||
|
@ -229,71 +222,6 @@ public class OpenaireBrokerController extends AbstractLbsController {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ApiOperation("Returns notifications using scrolls (first page)")
|
|
||||||
@GetMapping("/scroll/notifications/start/{subscrId}")
|
|
||||||
public ScrollPage prepareScrollNotifications(@PathVariable final String subscrId) {
|
|
||||||
|
|
||||||
final Optional<Subscription> optSub = subscriptionRepo.findById(subscrId);
|
|
||||||
|
|
||||||
if (optSub.isPresent()) {
|
|
||||||
|
|
||||||
final ElasticsearchRestTemplate esTemplate = (ElasticsearchRestTemplate) esOperations;
|
|
||||||
|
|
||||||
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
|
|
||||||
.withQuery(QueryBuilders.matchQuery("subscriptionId", subscrId))
|
|
||||||
.withSearchType(SearchType.DEFAULT)
|
|
||||||
.withFields("payload")
|
|
||||||
.build();
|
|
||||||
|
|
||||||
final SearchScrollHits<Notification> scroll =
|
|
||||||
esTemplate.searchScrollStart(SCROLL_TIMEOUT_IN_MILLIS, searchQuery, Notification.class, IndexCoordinates.of(props.getNotificationsIndexName()));
|
|
||||||
if (scroll.hasSearchHits()) {
|
|
||||||
final List<OaBrokerEventPayload> values = calculateEventPayloads(scroll);
|
|
||||||
return new ScrollPage(scroll.getScrollId(), values.isEmpty() || scroll.getScrollId() == null, values);
|
|
||||||
} else {
|
|
||||||
esTemplate.searchScrollClear(Arrays.asList(scroll.getScrollId()));
|
|
||||||
return new ScrollPage(null, true, new ArrayList<>());
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
log.warn("Invalid subscription: " + subscrId);
|
|
||||||
return new ScrollPage();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@ApiOperation("Returns notifications using scrolls (other pages)")
|
|
||||||
@GetMapping("/scroll/notifications/{scrollId}")
|
|
||||||
public ScrollPage scrollNotifications(@PathVariable final String scrollId) {
|
|
||||||
|
|
||||||
final ElasticsearchRestTemplate esTemplate = (ElasticsearchRestTemplate) esOperations;
|
|
||||||
|
|
||||||
final SearchScrollHits<Notification> scroll =
|
|
||||||
esTemplate.searchScrollContinue(scrollId, SCROLL_TIMEOUT_IN_MILLIS, Notification.class, IndexCoordinates.of(props.getNotificationsIndexName()));
|
|
||||||
if (scroll.hasSearchHits()) {
|
|
||||||
final List<OaBrokerEventPayload> values = calculateEventPayloads(scroll);
|
|
||||||
return new ScrollPage(scroll.getScrollId(), values.isEmpty() || scroll.getScrollId() == null, values);
|
|
||||||
} else {
|
|
||||||
esTemplate.searchScrollClear(Arrays.asList(scroll.getScrollId()));
|
|
||||||
return new ScrollPage(null, true, new ArrayList<>());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<OaBrokerEventPayload> calculateEventPayloads(final SearchScrollHits<Notification> scroll) {
|
|
||||||
if (scroll.getSearchHits().size() > 0) {
|
|
||||||
final Gson gson = new Gson();
|
|
||||||
|
|
||||||
return scroll.stream()
|
|
||||||
.map(SearchHit::getContent)
|
|
||||||
.map(Notification::getPayload)
|
|
||||||
.map(s -> gson.fromJson(s, OaBrokerEventPayload.class))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
} else {
|
|
||||||
return new ArrayList<>();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private SimpleSubscriptionDesc subscriptionDesc(final Subscription s) {
|
private SimpleSubscriptionDesc subscriptionDesc(final Subscription s) {
|
||||||
return new SimpleSubscriptionDesc(s.getSubscriptionId(), extractDatasource(s), s.getTopic(), s.getCreationDate(), s.getLastNotificationDate(),
|
return new SimpleSubscriptionDesc(s.getSubscriptionId(), extractDatasource(s), s.getTopic(), s.getCreationDate(), s.getLastNotificationDate(),
|
||||||
OpenaireBrokerController.this.notificationRepository.countBySubscriptionId(s.getSubscriptionId()));
|
OpenaireBrokerController.this.notificationRepository.countBySubscriptionId(s.getSubscriptionId()));
|
||||||
|
|
|
@ -1,14 +1,21 @@
|
||||||
package eu.dnetlib.broker.oa.controllers;
|
package eu.dnetlib.broker.oa.controllers;
|
||||||
|
|
||||||
|
import java.beans.Introspector;
|
||||||
|
import java.beans.PropertyDescriptor;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
import com.google.gson.Gson;
|
import com.google.gson.Gson;
|
||||||
|
|
||||||
import eu.dnetlib.broker.common.elasticsearch.Notification;
|
import eu.dnetlib.broker.common.elasticsearch.Notification;
|
||||||
import eu.dnetlib.broker.objects.OaBrokerEventPayload;
|
import eu.dnetlib.broker.objects.OaBrokerEventPayload;
|
||||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
|
||||||
|
|
||||||
public class NotificationMessage implements Serializable {
|
public class NotificationMessage implements Serializable {
|
||||||
|
|
||||||
|
@ -19,12 +26,16 @@ public class NotificationMessage implements Serializable {
|
||||||
|
|
||||||
private String originalId;
|
private String originalId;
|
||||||
|
|
||||||
|
private String title;
|
||||||
|
|
||||||
private String topic;
|
private String topic;
|
||||||
|
|
||||||
private float trust;
|
private float trust;
|
||||||
|
|
||||||
private Map<String, String> message = new LinkedHashMap<>();
|
private Map<String, String> message = new LinkedHashMap<>();
|
||||||
|
|
||||||
|
private static final Log log = LogFactory.getLog(NotificationMessage.class);
|
||||||
|
|
||||||
public static NotificationMessage fromNotification(final Notification n) {
|
public static NotificationMessage fromNotification(final Notification n) {
|
||||||
final Gson gson = new Gson();
|
final Gson gson = new Gson();
|
||||||
|
|
||||||
|
@ -33,16 +44,66 @@ public class NotificationMessage implements Serializable {
|
||||||
final NotificationMessage res = new NotificationMessage();
|
final NotificationMessage res = new NotificationMessage();
|
||||||
|
|
||||||
res.setOriginalId(payload.getResult().getOriginalId());
|
res.setOriginalId(payload.getResult().getOriginalId());
|
||||||
|
res.setTitle(payload.getResult().getTitles().stream().filter(StringUtils::isNotBlank).findFirst().orElse(null));
|
||||||
res.setTopic(n.getTopic());
|
res.setTopic(n.getTopic());
|
||||||
res.setTrust(payload.getTrust());
|
res.setTrust(payload.getTrust());
|
||||||
res.setMessage(highlightAsMap(payload.getHighlight()));
|
res.setMessage(highlightAsMap("", payload.getHighlight()));
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Map<String, String> highlightAsMap(final OaBrokerMainEntity hl) {
|
private static Map<String, String> highlightAsMap(final String prefix, final Object bean) {
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
final Map<String, String> res = new LinkedHashMap<>();
|
||||||
|
|
||||||
|
try {
|
||||||
|
for (final PropertyDescriptor pd : Introspector.getBeanInfo(bean.getClass(), Object.class).getPropertyDescriptors()) {
|
||||||
|
if (pd.getReadMethod() != null) {
|
||||||
|
final Object v = pd.getReadMethod().invoke(bean);
|
||||||
|
if (v != null) {
|
||||||
|
if (v instanceof List && !((List<?>) v).isEmpty()) {
|
||||||
|
final List<?> list = (List<?>) v;
|
||||||
|
for (int i = 0; i < list.size(); i++) {
|
||||||
|
final Object x = list.get(i);
|
||||||
|
if (x instanceof String && StringUtils.isNotBlank(x.toString())) {
|
||||||
|
res.put(prefix + pd.getName() + "[" + i + "]", x.toString());
|
||||||
|
} else {
|
||||||
|
res.putAll(highlightAsMap(prefix + pd.getName() + "[" + i + "].", x));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (v instanceof String) {
|
||||||
|
res.put(prefix + pd.getName(), v.toString());
|
||||||
|
} else {
|
||||||
|
res.putAll(highlightAsMap(pd.getName() + ".", v));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
|
||||||
|
} catch (final Exception e) {
|
||||||
|
log.warn(e);
|
||||||
|
return Collections.emptyMap();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getOriginalId() {
|
||||||
|
return originalId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOriginalId(final String originalId) {
|
||||||
|
this.originalId = originalId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTitle() {
|
||||||
|
return title;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTitle(final String title) {
|
||||||
|
this.title = title;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getTopic() {
|
public String getTopic() {
|
||||||
|
@ -69,12 +130,4 @@ public class NotificationMessage implements Serializable {
|
||||||
this.message = message;
|
this.message = message;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getOriginalId() {
|
|
||||||
return originalId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setOriginalId(final String originalId) {
|
|
||||||
this.originalId = originalId;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,13 @@
|
||||||
package eu.dnetlib.broker.oa.controllers;
|
package eu.dnetlib.broker.oa.controllers;
|
||||||
|
|
||||||
import java.io.BufferedOutputStream;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.zip.GZIPOutputStream;
|
||||||
|
|
||||||
|
import javax.servlet.ServletOutputStream;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
import org.apache.commons.codec.digest.DigestUtils;
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
|
@ -18,6 +19,7 @@ import org.elasticsearch.action.search.SearchType;
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.context.annotation.Profile;
|
import org.springframework.context.annotation.Profile;
|
||||||
|
import org.springframework.data.domain.PageRequest;
|
||||||
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
|
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
|
||||||
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
|
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
|
||||||
import org.springframework.data.elasticsearch.core.SearchHit;
|
import org.springframework.data.elasticsearch.core.SearchHit;
|
||||||
|
@ -43,7 +45,7 @@ import io.swagger.annotations.ApiOperation;
|
||||||
|
|
||||||
@Profile("openaire")
|
@Profile("openaire")
|
||||||
@RestController
|
@RestController
|
||||||
@RequestMapping("/api/openaireBroker")
|
@RequestMapping("/")
|
||||||
@Api(tags = BrokerConfiguration.TAG_OPENAIRE)
|
@Api(tags = BrokerConfiguration.TAG_OPENAIRE)
|
||||||
public class OpenairePublicController extends AbstractLbsController {
|
public class OpenairePublicController extends AbstractLbsController {
|
||||||
|
|
||||||
|
@ -71,9 +73,10 @@ public class OpenairePublicController extends AbstractLbsController {
|
||||||
final ElasticsearchRestTemplate esTemplate = (ElasticsearchRestTemplate) esOperations;
|
final ElasticsearchRestTemplate esTemplate = (ElasticsearchRestTemplate) esOperations;
|
||||||
|
|
||||||
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
|
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
|
||||||
.withQuery(QueryBuilders.matchQuery("subscriptionId", subscrId))
|
.withQuery(QueryBuilders.termQuery("subscriptionId.keyword", subscrId))
|
||||||
.withSearchType(SearchType.DEFAULT)
|
.withSearchType(SearchType.DEFAULT)
|
||||||
.withFields("topic", "payload")
|
.withFields("topic", "payload")
|
||||||
|
.withPageable(PageRequest.of(0, 100))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
final SearchScrollHits<Notification> scroll =
|
final SearchScrollHits<Notification> scroll =
|
||||||
|
@ -101,7 +104,7 @@ public class OpenairePublicController extends AbstractLbsController {
|
||||||
final String dsId = calculateDsIdFromOpenDoarId(opendoarId);
|
final String dsId = calculateDsIdFromOpenDoarId(opendoarId);
|
||||||
|
|
||||||
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
|
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
|
||||||
.withQuery(QueryBuilders.nestedQuery("map", QueryBuilders.matchQuery("map.targetDatasourceId", dsId), ScoreMode.None))
|
.withQuery(QueryBuilders.nestedQuery("map", QueryBuilders.termQuery("map.targetDatasourceId", dsId), ScoreMode.None))
|
||||||
.withSearchType(SearchType.DEFAULT)
|
.withSearchType(SearchType.DEFAULT)
|
||||||
.withFields("topic", "payload")
|
.withFields("topic", "payload")
|
||||||
.build();
|
.build();
|
||||||
|
@ -140,36 +143,40 @@ public class OpenairePublicController extends AbstractLbsController {
|
||||||
}
|
}
|
||||||
|
|
||||||
@ApiOperation("Returns notifications as file")
|
@ApiOperation("Returns notifications as file")
|
||||||
@GetMapping("/file/notifications/bySubscriptionId/{subscrId}")
|
@GetMapping(value = "/file/notifications/bySubscriptionId/{subscrId}", produces = "application/gzip")
|
||||||
public void notificationsAsFile(final HttpServletResponse res, @PathVariable final String subscrId) throws Exception {
|
public void notificationsAsFile(final HttpServletResponse res, @PathVariable final String subscrId) throws Exception {
|
||||||
res.setContentType("application/json");
|
|
||||||
|
|
||||||
final Gson gson = new Gson();
|
final Gson gson = new Gson();
|
||||||
|
|
||||||
final BufferedOutputStream out = new BufferedOutputStream(res.getOutputStream());
|
res.setHeader("Content-Disposition", "attachment; filename=dump.json.gz");
|
||||||
|
|
||||||
|
try (final ServletOutputStream out = res.getOutputStream(); final GZIPOutputStream gzOut = new GZIPOutputStream(out)) {
|
||||||
|
|
||||||
boolean first = true;
|
boolean first = true;
|
||||||
|
|
||||||
IOUtils.write("[", out);
|
IOUtils.write("[", gzOut);
|
||||||
|
|
||||||
ScrollPage<NotificationMessage> page = null;
|
ScrollPage<NotificationMessage> page = null;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
page = page == null ? prepareScrollNotificationsBySubscrId(subscrId) : scrollNotifications(page.getId());
|
page = page == null ? prepareScrollNotificationsBySubscrId(subscrId) : scrollNotifications(page.getId());
|
||||||
|
|
||||||
for (final NotificationMessage msg : page.getValues()) {
|
for (final NotificationMessage msg : page.getValues()) {
|
||||||
if (first) {
|
if (first) {
|
||||||
first = false;
|
first = false;
|
||||||
} else {
|
} else {
|
||||||
IOUtils.write(", ", out);
|
IOUtils.write(", ", gzOut);
|
||||||
}
|
}
|
||||||
IOUtils.write(gson.toJson(msg), out);
|
IOUtils.write(gson.toJson(msg), gzOut);
|
||||||
}
|
}
|
||||||
} while (!page.isCompleted());
|
} while (!page.isCompleted());
|
||||||
|
|
||||||
IOUtils.write("]", out);
|
IOUtils.write("]", gzOut);
|
||||||
|
|
||||||
|
gzOut.flush();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
out.flush();
|
|
||||||
out.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<NotificationMessage> calculateNotificationMessages(final SearchScrollHits<Notification> scroll) {
|
private List<NotificationMessage> calculateNotificationMessages(final SearchScrollHits<Notification> scroll) {
|
||||||
|
|
Loading…
Reference in New Issue