diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/researchfi/ResearchFiCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/researchfi/ResearchFiCollectorPlugin.java new file mode 100644 index 000000000..c5961c598 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/researchfi/ResearchFiCollectorPlugin.java @@ -0,0 +1,72 @@ +package eu.dnetlib.dhp.collection.plugin.researchfi; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.NameValuePair; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicNameValuePair; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; +import eu.dnetlib.dhp.common.collection.CollectorException; + +public class ResearchFiCollectorPlugin implements CollectorPlugin { + + private static final Logger log = LoggerFactory.getLogger(ResearchFiCollectorPlugin.class); + + @Override + public Stream collect(final ApiDescriptor api, final AggregatorReport report) + throws CollectorException { + + final String authUrl = api.getParams().get("auth_url"); + final String clientId = api.getParams().get("auth_client_id"); + final String clientSecret = api.getParams().get("auth_client_secret"); + + final String authToken = authenticate(authUrl, clientId, clientSecret); + + final Iterator iter = new ResearchFiIterator(api.getBaseUrl(), authToken); + + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED), false); + } + + private String authenticate(final String authUrl, final String clientId, final String clientSecret) throws CollectorException { + try (final CloseableHttpClient client = HttpClients.createDefault()) { + final HttpPost req = new HttpPost(authUrl); + final List params = new ArrayList<>(); + params.add(new BasicNameValuePair("grant_type", "client_credentials")); + params.add(new BasicNameValuePair("client_id", clientId)); + params.add(new BasicNameValuePair("client_secret", clientSecret)); + + req.setEntity(new UrlEncodedFormEntity(params, "UTF-8")); + + try (final CloseableHttpResponse response = client.execute(req)) { + final String content = IOUtils.toString(response.getEntity().getContent()); + final JSONObject obj = new JSONObject(content); + final String token = obj.getString("access_token"); + if (StringUtils.isNotBlank(token)) { return token; } + } + } catch (final Throwable e) { + log.warn("Error obtaining access token", e); + throw new CollectorException("Error obtaining access token", e); + } + throw new CollectorException("Access token is missing"); + + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/researchfi/ResearchFiIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/researchfi/ResearchFiIterator.java new file mode 100644 index 000000000..38a3cece8 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/researchfi/ResearchFiIterator.java @@ -0,0 +1,115 @@ +package eu.dnetlib.dhp.collection.plugin.researchfi; + +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.PriorityBlockingQueue; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.Header; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.json.JSONArray; + +import eu.dnetlib.dhp.collection.plugin.utils.JsonUtils; +import eu.dnetlib.dhp.common.collection.CollectorException; + +public class ResearchFiIterator implements Iterator { + + private static final Log log = LogFactory.getLog(ResearchFiIterator.class); + + private static final int PAGE_SIZE = 100; + + private final String baseUrl; + private final String authToken; + private int currPage; + private int nPages; + + private final Queue queue = new PriorityBlockingQueue<>(); + + public ResearchFiIterator(final String baseUrl, final String authToken) { + this.baseUrl = baseUrl; + this.authToken = authToken; + this.currPage = 0; + this.nPages = 0; + } + + private void verifyStarted() { + if (this.currPage == 0) { + try { + nextCall(); + } catch (final CollectorException e) { + throw new IllegalStateException(e); + } + } + } + + @Override + public boolean hasNext() { + synchronized (this.queue) { + verifyStarted(); + return !this.queue.isEmpty(); + } + } + + @Override + public String next() { + synchronized (this.queue) { + verifyStarted(); + final String res = this.queue.poll(); + while (this.queue.isEmpty() && (this.currPage < this.nPages)) { + try { + nextCall(); + } catch (final CollectorException e) { + throw new IllegalStateException(e); + } + } + return res; + } + } + + private void nextCall() throws CollectorException { + + this.currPage += 1; + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + final String url; + if (!this.baseUrl.contains("?")) { + url = String.format("%s?PageNumber=%d&PageSize=%d", this.baseUrl, this.currPage, PAGE_SIZE); + } else if (!this.baseUrl.contains("PageSize=")) { + url = String.format("%s&PageNumber=%d&PageSize=%d", this.baseUrl, this.currPage, PAGE_SIZE); + } else { + url = String.format("%s&PageNumber=%d", this.baseUrl, this.currPage); + } + log.info("Calling url: " + url); + + final HttpGet req = new HttpGet(url); + req.addHeader("Authorization", "Bearer " + this.authToken); + try (final CloseableHttpResponse response = client.execute(req)) { + for (final Header header : response.getAllHeaders()) { + log.debug("HEADER: " + header.getName() + " = " + header.getValue()); + if ("x-page-count".equals(header.getName())) { + final int totalPages = NumberUtils.toInt(header.getValue()); + if (this.nPages != totalPages) { + this.nPages = NumberUtils.toInt(header.getValue()); + log.info("Total pages: " + totalPages); + } + } + } + + final String content = IOUtils.toString(response.getEntity().getContent()); + final JSONArray jsonArray = new JSONArray(content); + + jsonArray.forEach(obj -> this.queue.add(JsonUtils.convertToXML(obj.toString()))); + } + } catch (final Throwable e) { + log.warn("Error obtaining access token", e); + throw new CollectorException("Error obtaining access token", e); + } + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/researchfi/ResearchFiCollectorPluginTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/researchfi/ResearchFiCollectorPluginTest.java new file mode 100644 index 000000000..47c77796b --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/researchfi/ResearchFiCollectorPluginTest.java @@ -0,0 +1,55 @@ +package eu.dnetlib.dhp.collection.plugin.researchfi; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import org.dom4j.DocumentException; +import org.dom4j.DocumentHelper; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; +import eu.dnetlib.dhp.common.collection.CollectorException; + +public class ResearchFiCollectorPluginTest { + + private final ResearchFiCollectorPlugin plugin = new ResearchFiCollectorPlugin(); + + @Test + @Disabled + void testCollect() throws CollectorException { + final ApiDescriptor api = new ApiDescriptor(); + api.setBaseUrl("https://research.fi/api/rest/v1/funding-decisions?FunderName=AKA&FundingStartYearFrom=2022"); + api.setProtocol("research_fi"); + api.getParams().put("auth_url", "https://researchfi-auth.2.rahtiapp.fi/realms/publicapi/protocol/openid-connect/token"); + api.getParams().put("auth_client_id", ""); + api.getParams().put("auth_client_secret", ""); + + final AtomicLong count = new AtomicLong(0); + final Set ids = new HashSet<>(); + + this.plugin.collect(api, new AggregatorReport()).forEach(s -> { + + if (count.getAndIncrement() == 0) { + System.out.println("First: " + s); + } + + try { + final String id = DocumentHelper.parseText(s).valueOf("/recordWrap/funderProjectNumber"); + if (ids.contains(id)) { + System.out.println("Id already present: " + id); + } + ids.add(id); + } catch (final DocumentException e) { + throw new RuntimeException(e); + } + }); + + System.out.println("Total records: " + count); + System.out.println("Total identifiers: " + ids.size()); + + } + +}