package eu.dnetlib.repo.manager.service; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.repo.manager.domain.BrokerException; import eu.dnetlib.repo.manager.domain.RepositorySnippet; import eu.dnetlib.repo.manager.domain.Term; import eu.dnetlib.repo.manager.domain.Tuple; import eu.dnetlib.repo.manager.domain.broker.*; import org.apache.commons.lang.NotImplementedException; import org.json.JSONException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.*; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; import org.springframework.stereotype.Service; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import org.springframework.web.client.RestClientException; import org.springframework.web.client.RestTemplate; import org.springframework.web.util.UriComponents; import org.springframework.web.util.UriComponentsBuilder; import javax.annotation.PostConstruct; import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.util.*; @Service("brokerService") public class BrokerServiceImpl implements BrokerService { @Autowired private RepositoryServiceImpl repoAPI; @Value("${services.broker.url}:${services.broker.port}/${services.broker.api}${services.broker.openaire}") private String openairePath; @Value("${services.broker.url}:${services.broker.port}/${services.broker.api}") private String apiPath; @Value("${topic_types.url}") private String topicsURL; private static final org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger .getLogger(BrokerServiceImpl.class); @Autowired RestTemplate restTemplate; private HttpHeaders httpHeaders; private HashMap topics = new HashMap(); @PostConstruct private void initDnetTopicsMap() { httpHeaders = new HttpHeaders(); httpHeaders.set("Content-Type", "application/json"); LOGGER.debug("Init dnet topics!"); try (InputStream is = new URL(topicsURL).openStream()) { ObjectMapper mapper = new ObjectMapper(); JsonNode root = mapper.readTree(is); for (JsonNode term : root.path("terms")) topics.put(term.path("code").textValue(), parseTerm(term)); } catch (IOException e) { LOGGER.error("Exception on initDnetTopicsMap", e); } } private Term parseTerm(JsonNode term) { return new Term(term.path("englishName").textValue(), term.path("nativeName").textValue(), term.path("encoding").textValue(), term.path("code").textValue()); } @Override public DatasourcesBroker getDatasourcesOfUser(String user, String includeShared, String includeByOthers) throws JSONException { long start = System.currentTimeMillis(); DatasourcesBroker ret = new DatasourcesBroker(); try { ret.setDatasourcesOfUser(getDatasourcesOfUserType(repoAPI.getRepositoriesSnippetsOfUser(user, "0", "100"))); //TODO fix bug when values are true // if (Boolean.parseBoolean(includeShared)) { // List sharedDatasourceIds = new ArrayList(); // ret.setSharedDatasources(getDatasourcesOfUserType(getRepositoriesByIds(sharedDatasourceIds))); // } // if (Boolean.parseBoolean(includeByOthers)) { // ret.setDatasourcesOfOthers(getDatasourcesOfUserType(getRepositoriesOfUser(user))); // } } catch (Exception e) { LOGGER.error("Exception on getDatasourcesOfUser", e); } long end = System.currentTimeMillis(); System.out.println("Getting datasources of user in " + (end - start) + "ms"); return ret; } @Override public List getTopicsForDatasource(String datasourceName) throws BrokerException { final String service = "/topicsForDatasource"; UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(openairePath + service) .queryParam("ds", datasourceName); ResponseEntity> resp; try { resp = restTemplate.exchange( builder.build().encode().toUri(), HttpMethod.GET, null, new ParameterizedTypeReference>() { }); } catch (RestClientException e) { throw new BrokerException(e); } return resp.getBody(); } @Override public EventsPage advancedShowEvents(String page, String size, AdvQueryObject advQueryObject) throws BrokerException, JSONException, IOException { final String service = "/events/{page}/{pageSize}"; Map uriParams = new HashMap<>(); uriParams.put("page", Long.parseLong(page)); uriParams.put("pageSize", Long.parseLong(size)); UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(openairePath + service); MultiValueMap headers = new LinkedMultiValueMap<>(); advQueryObject.setPage(Long.parseLong(page)); HttpEntity entity = new HttpEntity<>(advQueryObject, httpHeaders); ResponseEntity resp; try { resp = restTemplate.exchange( builder.buildAndExpand(uriParams).encode().toUri(), HttpMethod.POST, entity, new ParameterizedTypeReference() { } ); } catch (RestClientException e) { throw new BrokerException(e); } return resp.getBody(); } private List> getDatasourcesOfUserType(List repositories) throws BrokerException { long start = System.currentTimeMillis(); List> entries = new ArrayList<>(); for (RepositorySnippet repo : repositories) { BrowseEntry temp = new BrowseEntry(); temp.setValue(repo.getOfficialname()); temp.setSize(new Long(0)); for (BrowseEntry e : getTopicsForDatasource(repo.getOfficialname())) { temp.setSize(temp.getSize() + e.getSize()); } Tuple tup = new Tuple<>(temp, repo.getLogoUrl()); entries.add(tup); } // sort the collection by the second field of the tuple which is size Collections.sort(entries, new Comparator>() { @Override public int compare(Tuple e1, Tuple e2) { return (int) (e2.getFirst().getSize().longValue() - e1.getFirst().getSize().longValue()); } }); long stop = System.currentTimeMillis(); System.out.println("getDatasourcesOfUserType returned in " + (stop - start) + "ms "); return entries; } @Override public EventsPage showEvents(String datasourceName, String topic, String page, String size) throws BrokerException, JSONException { final String service = "/events"; UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(openairePath + service) .queryParam("ds", datasourceName) .queryParam("topic", topic) .path("/{page}/{size}/"); ResponseEntity resp; try { resp = restTemplate.exchange( builder.build().expand(page, size).encode().toUri(), HttpMethod.GET, null, new ParameterizedTypeReference() { }); } catch (RestClientException e) { throw new BrokerException(e); } return resp.getBody(); } @Override public Map> getSimpleSubscriptionsOfUser(String userEmail) throws BrokerException { final String service = "/subscriptions"; UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(openairePath + service) .queryParam("email", userEmail); LOGGER.debug(builder.build().encode().toUri()); ResponseEntity>> resp; try { resp = restTemplate.exchange( builder.build().encode().toUri(), HttpMethod.GET, null, new ParameterizedTypeReference>>() { }); } catch (RestClientException e) { throw new BrokerException(e); } return resp.getBody(); } @Override public Map> getSimpleSubscriptionsOfUserByRepoId(String userEmail, String repoId) throws BrokerException { Map> subscriptionsOfUser = getSimpleSubscriptionsOfUser(userEmail); throw new NotImplementedException(); // return null; } @Override public Subscription subscribe(OpenaireSubscription obj) throws BrokerException { final String service = "/subscribe"; //build the uri params UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(openairePath + service); HttpEntity entity = new HttpEntity<>(obj, httpHeaders); //create new template engine RestTemplate template = new RestTemplate(); template.getMessageConverters().add(new MappingJackson2HttpMessageConverter()); ResponseEntity resp; try { //communicate with endpoint resp = restTemplate.exchange( builder.build().encode().toUri(), HttpMethod.POST, entity, new ParameterizedTypeReference() { }); } catch (RestClientException e) { throw new BrokerException(e); } return resp.getBody(); } @Override public ResponseEntity unsubscribe(String subscriptionId) throws BrokerException { final String service = "/subscriptions/" + subscriptionId; //build the uri params UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(apiPath + service); try { //communicate with endpoint restTemplate.exchange( builder.build().encode().toUri(), HttpMethod.DELETE, null, new ParameterizedTypeReference() { }); } catch (RestClientException e) { throw new BrokerException(e); } return new ResponseEntity<>("OK", HttpStatus.OK); } @Override public Subscription getSubscription(String subscriptionId) throws BrokerException { final String service = "/subscriptions/" + subscriptionId; //build the uri params UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(apiPath + service); ResponseEntity resp; try { //communicate with endpoint resp = restTemplate.exchange( builder.build().encode().toUri(), HttpMethod.GET, null, new ParameterizedTypeReference() { }); } catch (RestClientException e) { throw new BrokerException(e); } return resp.getBody(); } @Override public Map getDnetTopics() throws BrokerException { return topics; } @Override public EventsPage getNotificationsBySubscriptionId(String subscriptionId, String page, String size) throws BrokerException { UriComponents uriComponents = UriComponentsBuilder .fromHttpUrl(openairePath + "/notifications/") .path("/{id}/{page}/{size}/") .build().expand(subscriptionId, page, size).encode(); ResponseEntity resp; try { resp = restTemplate.exchange( uriComponents.toUri(), HttpMethod.GET, null, new ParameterizedTypeReference() { }); } catch (RestClientException e) { throw new BrokerException(e); } return resp.getBody(); } //@Override public Map> getSubscriptionsOfUser(String userEmail) throws BrokerException { Map> simpleSubs = getSimpleSubscriptionsOfUser(userEmail); Map> subs = new HashMap<>(); List subscriptions = null; for (String s : simpleSubs.keySet()) { List simpleSubscriptionDescs = simpleSubs.get(s); for (SimpleSubscriptionDesc simpleSubscriptionDesc : simpleSubscriptionDescs) { subscriptions = new ArrayList<>(); subscriptions.add(getSubscription(simpleSubscriptionDesc.getId())); } subs.put(s, subscriptions); } return subs; } }