package eu.dnetlib.repo.manager.service; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.repo.manager.domain.RepositorySnippet; import eu.dnetlib.repo.manager.domain.Term; import eu.dnetlib.repo.manager.domain.Tuple; import eu.dnetlib.repo.manager.domain.broker.*; import eu.dnetlib.repo.manager.exception.BrokerException; import org.apache.commons.lang.NotImplementedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.*; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; import org.springframework.stereotype.Service; import org.springframework.web.client.RestClientException; import org.springframework.web.client.RestTemplate; import org.springframework.web.util.UriComponents; import org.springframework.web.util.UriComponentsBuilder; import javax.annotation.PostConstruct; import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.net.URL; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @Service("brokerService") public class BrokerServiceImpl implements BrokerService { @Autowired private RepositoryServiceImpl repoAPI; @Value("${services.provide.broker.url}:${services.provide.broker.port}/${services.provide.broker.api}${services.provide.broker.openaire}") private String openairePath; @Value("${services.provide.broker.url}:${services.provide.broker.port}/${services.provide.broker.api}") private String apiPath; @Value("${services.provide.topic_types.url}") private String topicsURL; private static final Logger logger = LoggerFactory.getLogger(BrokerServiceImpl.class); @Autowired RestTemplate restTemplate; private HttpHeaders httpHeaders; private final HashMap topics = new HashMap<>(); @PostConstruct private void initDnetTopicsMap() { httpHeaders = new HttpHeaders(); httpHeaders.set("Content-Type", "application/json"); logger.debug("Init dnet topics!"); try (InputStream is = new URL(topicsURL).openStream()) { ObjectMapper mapper = new ObjectMapper(); JsonNode root = mapper.readTree(is); for (JsonNode term : root.path("terms")) topics.put(term.path("code").textValue(), parseTerm(term)); } catch (IOException e) { logger.error("Exception on initDnetTopicsMap", e); } } private Term parseTerm(JsonNode term) { return new Term(term.path("englishName").textValue(), term.path("nativeName").textValue(), term.path("encoding").textValue(), term.path("code").textValue()); } @Override public DatasourcesBroker getDatasourcesOfUser(String user, String includeShared, String includeByOthers) { long start = System.currentTimeMillis(); DatasourcesBroker ret = new DatasourcesBroker(); try { ret.setDatasourcesOfUser(getDatasourcesOfUserType(repoAPI.getRepositoriesSnippetsOfUser(user, "0", "100"))); //TODO fix bug when values are true // if (Boolean.parseBoolean(includeShared)) { // List sharedDatasourceIds = new ArrayList(); // ret.setSharedDatasources(getDatasourcesOfUserType(getRepositoriesByIds(sharedDatasourceIds))); // } // if (Boolean.parseBoolean(includeByOthers)) { // ret.setDatasourcesOfOthers(getDatasourcesOfUserType(getRepositoriesOfUser(user))); // } } catch (Exception e) { logger.error("Exception on getDatasourcesOfUser", e); } long end = System.currentTimeMillis(); logger.debug("Getting datasources of user in " + (end - start) + "ms"); return ret; } @Override public List getTopicsForDatasource(String datasourceName) throws BrokerException { final String service = "/topicsForDatasource"; UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(openairePath + service) .queryParam("ds", datasourceName); ResponseEntity> resp; try { resp = restTemplate.exchange( builder.build().encode().toUri(), HttpMethod.GET, null, new ParameterizedTypeReference>() { }); } catch (RestClientException e) { logger.error(e.getMessage()); throw new BrokerException(e); } return resp.getBody(); } @Override public EventsPage advancedShowEvents(String page, String size, AdvQueryObject advQueryObject) throws BrokerException { final String service = "/events/{page}/{pageSize}"; long pageNum = Long.parseLong(page); advQueryObject.setPage(pageNum); Map uriParams = new HashMap<>(); uriParams.put("page", pageNum); uriParams.put("pageSize", Long.parseLong(size)); UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(openairePath + service); HttpEntity entity = new HttpEntity<>(advQueryObject, httpHeaders); ResponseEntity resp; try { resp = restTemplate.exchange( builder.buildAndExpand(uriParams).encode().toUri(), HttpMethod.POST, entity, new ParameterizedTypeReference() {} ); } catch (RestClientException e) { logger.error(e.getMessage()); throw new BrokerException(e); } return resp.getBody(); } private List> getDatasourcesOfUserType(List repositories) throws BrokerException { long start = System.currentTimeMillis(); List> entries = new ArrayList<>(); for (RepositorySnippet repo : repositories) { BrowseEntry temp = new BrowseEntry(); temp.setValue(repo.getOfficialname()); temp.setSize(0L); for (BrowseEntry e : getTopicsForDatasource(repo.getOfficialname())) { temp.setSize(temp.getSize() + e.getSize()); } Tuple tup = new Tuple<>(temp, repo.getLogoUrl()); entries.add(tup); } // sort the collection by the second field of the tuple which is size entries.sort((e1, e2) -> (int) (e2.getFirst().getSize() - e1.getFirst().getSize())); long stop = System.currentTimeMillis(); logger.debug("getDatasourcesOfUserType returned in " + (stop - start) + "ms "); return entries; } @Override public EventsPage showEvents(String datasourceName, String topic, String page, String size) throws BrokerException { final String service = "/events"; UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(openairePath + service) .queryParam("ds", datasourceName) .queryParam("topic", topic) .path("/{page}/{size}/"); ResponseEntity resp; try { resp = restTemplate.exchange( builder.build().expand(page, size).encode().toUri(), HttpMethod.GET, null, new ParameterizedTypeReference() { }); } catch (RestClientException e) { logger.error(e.getMessage()); throw new BrokerException(e); } return resp.getBody(); } @Override public Map> getSimpleSubscriptionsOfUser(String userEmail) throws BrokerException { final String service = "/subscriptions"; URI uri = UriComponentsBuilder.fromHttpUrl(openairePath + service) .queryParam("email", userEmail).build().encode().toUri(); logger.debug("{}", uri); ResponseEntity>> resp; try { resp = restTemplate.exchange( uri, HttpMethod.GET, null, new ParameterizedTypeReference>>() { }); } catch (RestClientException e) { logger.error(e.getMessage()); throw new BrokerException(e); } return resp.getBody(); } @Override public Map> getSimpleSubscriptionsOfUserByRepoId(String userEmail, String repoId) { //throws BrokerException { throw new NotImplementedException(); //Map> subscriptionsOfUser = getSimpleSubscriptionsOfUser(userEmail); } @Override public Subscription subscribe(OpenaireSubscription obj) throws BrokerException { final String service = "/subscribe"; //build the uri params UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(openairePath + service); HttpEntity entity = new HttpEntity<>(obj, httpHeaders); //create new template engine RestTemplate template = new RestTemplate(); template.getMessageConverters().add(new MappingJackson2HttpMessageConverter()); ResponseEntity resp; try { //communicate with endpoint resp = restTemplate.exchange( builder.build().encode().toUri(), HttpMethod.POST, entity, new ParameterizedTypeReference() { }); } catch (RestClientException e) { logger.error(e.getMessage()); throw new BrokerException(e); } return resp.getBody(); } @Override public ResponseEntity unsubscribe(String subscriptionId) throws BrokerException { final String service = "/subscriptions/" + subscriptionId; //build the uri params UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(apiPath + service); try { //communicate with endpoint restTemplate.exchange( builder.build().encode().toUri(), HttpMethod.DELETE, null, new ParameterizedTypeReference() { }); } catch (RestClientException e) { logger.error(e.getMessage()); throw new BrokerException(e); } return new ResponseEntity<>("OK", HttpStatus.OK); } @Override public Subscription getSubscription(String subscriptionId) throws BrokerException { final String service = "/subscriptions/" + subscriptionId; //build the uri params UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(apiPath + service); ResponseEntity resp; try { //communicate with endpoint resp = restTemplate.exchange( builder.build().encode().toUri(), HttpMethod.GET, null, new ParameterizedTypeReference() { }); } catch (RestClientException e) { logger.error(e.getMessage()); throw new BrokerException(e); } return resp.getBody(); } @Override public Map getDnetTopics() throws BrokerException { return topics; } @Override public EventsPage getNotificationsBySubscriptionId(String subscriptionId, String page, String size) throws BrokerException { UriComponents uriComponents = UriComponentsBuilder .fromHttpUrl(openairePath + "/notifications/") .path("/{id}/{page}/{size}/") .build().expand(subscriptionId, page, size).encode(); ResponseEntity resp; try { resp = restTemplate.exchange( uriComponents.toUri(), HttpMethod.GET, null, new ParameterizedTypeReference() { }); } catch (RestClientException e) { logger.error(e.getMessage()); throw new BrokerException(e); } return resp.getBody(); } //@Override public Map> getSubscriptionsOfUser(String userEmail) throws BrokerException { Map> simpleSubs = getSimpleSubscriptionsOfUser(userEmail); Map> subs = new HashMap<>(); List subscriptions = null; for (String s : simpleSubs.keySet()) { List simpleSubscriptionDescs = simpleSubs.get(s); for (SimpleSubscriptionDesc simpleSubscriptionDesc : simpleSubscriptionDescs) { subscriptions = new ArrayList<>(); subscriptions.add(getSubscription(simpleSubscriptionDesc.getId())); } subs.put(s, subscriptions); } return subs; } }