implementation of the new collector plugin: research_fi #456

Merged
claudio.atzori merged 2 commits from research_fi_collector_plugin into beta 2024-07-17 10:25:39 +02:00
3 changed files with 243 additions and 0 deletions

View File

@ -0,0 +1,72 @@
package eu.dnetlib.dhp.collection.plugin.researchfi;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
public class ResearchFiCollectorPlugin implements CollectorPlugin {
private static final Logger log = LoggerFactory.getLogger(ResearchFiCollectorPlugin.class);
@Override
public Stream<String> collect(final ApiDescriptor api, final AggregatorReport report)
throws CollectorException {
final String authUrl = api.getParams().get("auth_url");
final String clientId = api.getParams().get("auth_client_id");
final String clientSecret = api.getParams().get("auth_client_secret");
final String authToken = authenticate(authUrl, clientId, clientSecret);
final Iterator<String> iter = new ResearchFiIterator(api.getBaseUrl(), authToken);
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED), false);
}
private String authenticate(final String authUrl, final String clientId, final String clientSecret) throws CollectorException {
try (final CloseableHttpClient client = HttpClients.createDefault()) {
final HttpPost req = new HttpPost(authUrl);
final List<NameValuePair> params = new ArrayList<>();
params.add(new BasicNameValuePair("grant_type", "client_credentials"));
params.add(new BasicNameValuePair("client_id", clientId));
params.add(new BasicNameValuePair("client_secret", clientSecret));
req.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
try (final CloseableHttpResponse response = client.execute(req)) {
final String content = IOUtils.toString(response.getEntity().getContent());
final JSONObject obj = new JSONObject(content);
final String token = obj.getString("access_token");
if (StringUtils.isNotBlank(token)) { return token; }
}
} catch (final Throwable e) {
log.warn("Error obtaining access token", e);
throw new CollectorException("Error obtaining access token", e);
}
throw new CollectorException("Access token is missing");
}
}

View File

@ -0,0 +1,116 @@
package eu.dnetlib.dhp.collection.plugin.researchfi;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.Header;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.json.JSONArray;
import eu.dnetlib.dhp.collection.plugin.utils.JsonUtils;
import eu.dnetlib.dhp.common.collection.CollectorException;
public class ResearchFiIterator implements Iterator<String> {
private static final Log log = LogFactory.getLog(ResearchFiIterator.class);
private static final int PAGE_SIZE = 100;
private final String baseUrl;
private final String authToken;
private int currPage;
private int nPages;
private final Queue<String> queue = new PriorityBlockingQueue<>();
public ResearchFiIterator(final String baseUrl, final String authToken) {
this.baseUrl = baseUrl;
this.authToken = authToken;
this.currPage = 0;
this.nPages = 0;
}
private void verifyStarted() {
if (this.currPage == 0) {
try {
nextCall();
} catch (final CollectorException e) {
throw new IllegalStateException(e);
}
}
}
@Override
public boolean hasNext() {
synchronized (this.queue) {
verifyStarted();
return !this.queue.isEmpty();
}
}
@Override
public String next() {
synchronized (this.queue) {
verifyStarted();
final String res = this.queue.poll();
while (this.queue.isEmpty() && (this.currPage < this.nPages)) {
try {
nextCall();
} catch (final CollectorException e) {
throw new IllegalStateException(e);
}
}
return res;
}
}
private void nextCall() throws CollectorException {
this.currPage += 1;
final String url;
if (!this.baseUrl.contains("?")) {
url = String.format("%s?PageNumber=%d&PageSize=%d", this.baseUrl, this.currPage, PAGE_SIZE);
} else if (!this.baseUrl.contains("PageSize=")) {
url = String.format("%s&PageNumber=%d&PageSize=%d", this.baseUrl, this.currPage, PAGE_SIZE);
} else {
url = String.format("%s&PageNumber=%d", this.baseUrl, this.currPage);
}
log.info("Calling url: " + url);
try (final CloseableHttpClient client = HttpClients.createDefault()) {
final HttpGet req = new HttpGet(url);
req.addHeader("Authorization", "Bearer " + this.authToken);
try (final CloseableHttpResponse response = client.execute(req)) {
for (final Header header : response.getAllHeaders()) {
log.debug("HEADER: " + header.getName() + " = " + header.getValue());
if ("x-page-count".equals(header.getName())) {
final int totalPages = NumberUtils.toInt(header.getValue());
if (this.nPages != totalPages) {
this.nPages = NumberUtils.toInt(header.getValue());
log.info("Total pages: " + totalPages);
}
}
}
final String content = IOUtils.toString(response.getEntity().getContent());
final JSONArray jsonArray = new JSONArray(content);
jsonArray.forEach(obj -> this.queue.add(JsonUtils.convertToXML(obj.toString())));
}
} catch (final Throwable e) {
log.warn("Error calling url: " + url, e);
throw new CollectorException("Error calling url: " + url, e);
}
}
}

View File

@ -0,0 +1,55 @@
package eu.dnetlib.dhp.collection.plugin.researchfi;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.dom4j.DocumentException;
import org.dom4j.DocumentHelper;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
public class ResearchFiCollectorPluginTest {
private final ResearchFiCollectorPlugin plugin = new ResearchFiCollectorPlugin();
@Test
@Disabled
void testCollect() throws CollectorException {
final ApiDescriptor api = new ApiDescriptor();
api.setBaseUrl("https://research.fi/api/rest/v1/funding-decisions?FunderName=AKA&FundingStartYearFrom=2022");
api.setProtocol("research_fi");
api.getParams().put("auth_url", "https://researchfi-auth.2.rahtiapp.fi/realms/publicapi/protocol/openid-connect/token");
api.getParams().put("auth_client_id", "");
api.getParams().put("auth_client_secret", "");
final AtomicLong count = new AtomicLong(0);
final Set<String> ids = new HashSet<>();
this.plugin.collect(api, new AggregatorReport()).forEach(s -> {
if (count.getAndIncrement() == 0) {
System.out.println("First: " + s);
}
try {
final String id = DocumentHelper.parseText(s).valueOf("/recordWrap/funderProjectNumber");
if (ids.contains(id)) {
System.out.println("Id already present: " + id);
}
ids.add(id);
} catch (final DocumentException e) {
throw new RuntimeException(e);
}
});
System.out.println("Total records: " + count);
System.out.println("Total identifiers: " + ids.size());
}
}