252 lines
9.0 KiB
Java
252 lines
9.0 KiB
Java
package eu.dnetlib.broker.oa.controllers;
|
|
|
|
import java.io.BufferedReader;
|
|
import java.io.FileNotFoundException;
|
|
import java.io.InputStreamReader;
|
|
import java.util.ArrayList;
|
|
import java.util.Arrays;
|
|
import java.util.List;
|
|
import java.util.Optional;
|
|
import java.util.stream.Collectors;
|
|
import java.util.zip.GZIPOutputStream;
|
|
|
|
import javax.servlet.ServletOutputStream;
|
|
import javax.servlet.http.HttpServletResponse;
|
|
|
|
import org.apache.commons.codec.digest.DigestUtils;
|
|
import org.apache.commons.io.IOUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.elasticsearch.action.search.SearchType;
|
|
import org.elasticsearch.index.query.QueryBuilders;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.context.annotation.Profile;
|
|
import org.springframework.data.domain.PageRequest;
|
|
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
|
|
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
|
|
import org.springframework.data.elasticsearch.core.SearchHit;
|
|
import org.springframework.data.elasticsearch.core.SearchScrollHits;
|
|
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
|
|
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
|
|
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
|
|
import org.springframework.web.bind.annotation.GetMapping;
|
|
import org.springframework.web.bind.annotation.PathVariable;
|
|
import org.springframework.web.bind.annotation.RequestMapping;
|
|
import org.springframework.web.bind.annotation.RestController;
|
|
|
|
import com.google.gson.Gson;
|
|
|
|
import eu.dnetlib.broker.BrokerConfiguration;
|
|
import eu.dnetlib.broker.api.ShortEventMessage;
|
|
import eu.dnetlib.broker.common.controllers.AbstractLbsController;
|
|
import eu.dnetlib.broker.common.elasticsearch.Notification;
|
|
import eu.dnetlib.broker.common.properties.ElasticSearchProperties;
|
|
import eu.dnetlib.broker.common.subscriptions.Subscription;
|
|
import eu.dnetlib.broker.common.subscriptions.SubscriptionRepository;
|
|
import eu.dnetlib.broker.objects.OaBrokerEventPayload;
|
|
import io.swagger.annotations.Api;
|
|
import io.swagger.annotations.ApiOperation;
|
|
|
|
@Profile("openaire")
|
|
@RestController
|
|
@RequestMapping("/")
|
|
@Api(tags = BrokerConfiguration.OA_PUBLIC_APIS)
|
|
public class OpenairePublicController extends AbstractLbsController {
|
|
|
|
@Autowired
|
|
private ElasticsearchOperations esOperations;
|
|
|
|
@Autowired
|
|
private SubscriptionRepository subscriptionRepo;
|
|
|
|
@Autowired
|
|
private ElasticSearchProperties props;
|
|
|
|
@Value("${lbs.hadoop.namenode.url}")
|
|
private String namenode;
|
|
|
|
@Value("${lbs.hadoop.opendoar.events.path}")
|
|
private String opendoarEventsPath;
|
|
|
|
private static final long SCROLL_TIMEOUT_IN_MILLIS = 5 * 60 * 1000;
|
|
|
|
private static final Log log = LogFactory.getLog(OpenairePublicController.class);
|
|
|
|
@ApiOperation("Returns notifications by subscription using scrolls (first page)")
|
|
@GetMapping("/scroll/notifications/bySubscriptionId/{subscrId}")
|
|
public ScrollPage<ShortEventMessage> prepareScrollNotificationsBySubscrId(@PathVariable final String subscrId) {
|
|
|
|
final Optional<Subscription> optSub = subscriptionRepo.findById(subscrId);
|
|
|
|
if (optSub.isPresent()) {
|
|
|
|
final ElasticsearchRestTemplate esTemplate = (ElasticsearchRestTemplate) esOperations;
|
|
|
|
final NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
|
|
.withQuery(QueryBuilders.termQuery("subscriptionId.keyword", subscrId))
|
|
.withSearchType(SearchType.DEFAULT)
|
|
.withFields("topic", "payload")
|
|
.withPageable(PageRequest.of(0, 100))
|
|
.build();
|
|
|
|
final SearchScrollHits<Notification> scroll =
|
|
esTemplate.searchScrollStart(SCROLL_TIMEOUT_IN_MILLIS, searchQuery, Notification.class, IndexCoordinates.of(props.getNotificationsIndexName()));
|
|
if (scroll.hasSearchHits()) {
|
|
final List<ShortEventMessage> values = calculateNotificationMessages(scroll);
|
|
return new ScrollPage<>(scroll.getScrollId(), values.isEmpty() || scroll.getScrollId() == null, values);
|
|
} else {
|
|
esTemplate.searchScrollClear(Arrays.asList(scroll.getScrollId()));
|
|
return new ScrollPage<>(null, true, new ArrayList<>());
|
|
}
|
|
|
|
} else {
|
|
log.warn("Invalid subscription: " + subscrId);
|
|
return new ScrollPage<>(null, true, new ArrayList<>());
|
|
}
|
|
}
|
|
|
|
@ApiOperation("Returns notifications using scrolls (other pages)")
|
|
@GetMapping("/scroll/notifications/{scrollId}")
|
|
public ScrollPage<ShortEventMessage> scrollNotifications(@PathVariable final String scrollId) {
|
|
|
|
final ElasticsearchRestTemplate esTemplate = (ElasticsearchRestTemplate) esOperations;
|
|
|
|
final SearchScrollHits<Notification> scroll =
|
|
esTemplate.searchScrollContinue(scrollId, SCROLL_TIMEOUT_IN_MILLIS, Notification.class, IndexCoordinates.of(props.getNotificationsIndexName()));
|
|
if (scroll.hasSearchHits()) {
|
|
final List<ShortEventMessage> values = calculateNotificationMessages(scroll);
|
|
return new ScrollPage<>(scroll.getScrollId(), values.isEmpty() || scroll.getScrollId() == null, values);
|
|
} else {
|
|
esTemplate.searchScrollClear(Arrays.asList(scroll.getScrollId()));
|
|
return new ScrollPage<>(null, true, new ArrayList<>());
|
|
}
|
|
}
|
|
|
|
@ApiOperation("Returns notifications as file")
|
|
@GetMapping(value = "/file/notifications/bySubscriptionId/{subscrId}", produces = "application/gzip")
|
|
public void notificationsAsFile(final HttpServletResponse res, @PathVariable final String subscrId) throws Exception {
|
|
|
|
final Gson gson = new Gson();
|
|
|
|
res.setHeader("Content-Disposition", "attachment; filename=dump.json.gz");
|
|
|
|
try (final ServletOutputStream out = res.getOutputStream(); final GZIPOutputStream gzOut = new GZIPOutputStream(out)) {
|
|
|
|
boolean first = true;
|
|
|
|
IOUtils.write("[\n", gzOut);
|
|
|
|
ScrollPage<ShortEventMessage> page = null;
|
|
|
|
do {
|
|
page = page == null ? prepareScrollNotificationsBySubscrId(subscrId) : scrollNotifications(page.getId());
|
|
|
|
for (final ShortEventMessage msg : page.getValues()) {
|
|
if (first) {
|
|
first = false;
|
|
} else {
|
|
IOUtils.write(",\n", gzOut);
|
|
}
|
|
IOUtils.write(gson.toJson(msg), gzOut);
|
|
}
|
|
} while (!page.isCompleted());
|
|
|
|
IOUtils.write("\n]\n", gzOut);
|
|
|
|
gzOut.flush();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
@ApiOperation("Returns events as file by opendoarId")
|
|
@GetMapping(value = "/file/events/opendoar/{id}", produces = "application/gzip")
|
|
public void opendoarEventsAsFile(final HttpServletResponse res, @PathVariable final String id) {
|
|
|
|
res.setHeader("Content-Disposition", "attachment; filename=dump.json.gz");
|
|
|
|
final Configuration conf = new Configuration();
|
|
conf.addResource(getClass().getResourceAsStream("/core-site.xml"));
|
|
conf.addResource(getClass().getResourceAsStream("/ocean-hadoop-conf.xml"));
|
|
|
|
final Path pathDir = new Path(opendoarEventsPath + "/" + DigestUtils.md5Hex(id));
|
|
|
|
try (final FileSystem fs = FileSystem.get(conf);
|
|
final ServletOutputStream out = res.getOutputStream();
|
|
final GZIPOutputStream gzOut = new GZIPOutputStream(out)) {
|
|
boolean first = true;
|
|
|
|
IOUtils.write("[\n", gzOut);
|
|
|
|
try {
|
|
for (final FileStatus fileStatus : fs.listStatus(pathDir)) {
|
|
if (fileStatus.isFile()) {
|
|
final Path path = fileStatus.getPath();
|
|
if (path.getName().endsWith(".json")) {
|
|
try (final FSDataInputStream fis = fs.open(path);
|
|
final InputStreamReader isr = new InputStreamReader(fis);
|
|
final BufferedReader br = new BufferedReader(isr)) {
|
|
|
|
String line = br.readLine();
|
|
while (line != null) {
|
|
if (first) {
|
|
first = false;
|
|
} else {
|
|
IOUtils.write(",\n", gzOut);
|
|
}
|
|
|
|
IOUtils.write(line, gzOut);
|
|
|
|
line = br.readLine();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} catch (final FileNotFoundException e) {
|
|
log.warn("File not found - " + e.getMessage());
|
|
}
|
|
IOUtils.write("\n]\n", gzOut);
|
|
gzOut.flush();
|
|
} catch (final Throwable e) {
|
|
log.error("Error accessing hdfs file", e);
|
|
throw new RuntimeException(e);
|
|
}
|
|
}
|
|
|
|
private List<ShortEventMessage> calculateNotificationMessages(final SearchScrollHits<Notification> scroll) {
|
|
if (scroll.getSearchHits().size() > 0) {
|
|
return scroll.stream()
|
|
.map(SearchHit::getContent)
|
|
.map(this::messageFromNotification)
|
|
.collect(Collectors.toList());
|
|
} else {
|
|
return new ArrayList<>();
|
|
}
|
|
}
|
|
|
|
private ShortEventMessage messageFromNotification(final Notification n) {
|
|
final Gson gson = new Gson();
|
|
|
|
final OaBrokerEventPayload payload = gson.fromJson(n.getPayload(), OaBrokerEventPayload.class);
|
|
|
|
final ShortEventMessage res = new ShortEventMessage();
|
|
|
|
res.setOriginalId(payload.getResult().getOriginalId());
|
|
res.setTitle(payload.getResult().getTitles().stream().filter(StringUtils::isNotBlank).findFirst().orElse(null));
|
|
res.setTopic(n.getTopic());
|
|
res.setTrust(payload.getTrust());
|
|
res.generateMessageFromObject(payload.getHighlight());
|
|
|
|
return res;
|
|
}
|
|
|
|
}
|