forked from D-Net/dnet-hadoop
implementeation of the new collector plugin: research_fi
This commit is contained in:
parent
814e650e12
commit
5cdba9172b
|
@ -0,0 +1,72 @@
|
|||
package eu.dnetlib.dhp.collection.plugin.researchfi;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Spliterator;
|
||||
import java.util.Spliterators;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.http.NameValuePair;
|
||||
import org.apache.http.client.entity.UrlEncodedFormEntity;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.apache.http.message.BasicNameValuePair;
|
||||
import org.json.JSONObject;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.dhp.collection.ApiDescriptor;
|
||||
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
|
||||
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||
|
||||
public class ResearchFiCollectorPlugin implements CollectorPlugin {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(ResearchFiCollectorPlugin.class);
|
||||
|
||||
@Override
|
||||
public Stream<String> collect(final ApiDescriptor api, final AggregatorReport report)
|
||||
throws CollectorException {
|
||||
|
||||
final String authUrl = api.getParams().get("auth_url");
|
||||
final String clientId = api.getParams().get("auth_client_id");
|
||||
final String clientSecret = api.getParams().get("auth_client_secret");
|
||||
|
||||
final String authToken = authenticate(authUrl, clientId, clientSecret);
|
||||
|
||||
final Iterator<String> iter = new ResearchFiIterator(api.getBaseUrl(), authToken);
|
||||
|
||||
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED), false);
|
||||
}
|
||||
|
||||
private String authenticate(final String authUrl, final String clientId, final String clientSecret) throws CollectorException {
|
||||
try (final CloseableHttpClient client = HttpClients.createDefault()) {
|
||||
final HttpPost req = new HttpPost(authUrl);
|
||||
final List<NameValuePair> params = new ArrayList<>();
|
||||
params.add(new BasicNameValuePair("grant_type", "client_credentials"));
|
||||
params.add(new BasicNameValuePair("client_id", clientId));
|
||||
params.add(new BasicNameValuePair("client_secret", clientSecret));
|
||||
|
||||
req.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
|
||||
|
||||
try (final CloseableHttpResponse response = client.execute(req)) {
|
||||
final String content = IOUtils.toString(response.getEntity().getContent());
|
||||
final JSONObject obj = new JSONObject(content);
|
||||
final String token = obj.getString("access_token");
|
||||
if (StringUtils.isNotBlank(token)) { return token; }
|
||||
}
|
||||
} catch (final Throwable e) {
|
||||
log.warn("Error obtaining access token", e);
|
||||
throw new CollectorException("Error obtaining access token", e);
|
||||
}
|
||||
throw new CollectorException("Access token is missing");
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,115 @@
|
|||
package eu.dnetlib.dhp.collection.plugin.researchfi;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.http.Header;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.json.JSONArray;
|
||||
|
||||
import eu.dnetlib.dhp.collection.plugin.utils.JsonUtils;
|
||||
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||
|
||||
public class ResearchFiIterator implements Iterator<String> {
|
||||
|
||||
private static final Log log = LogFactory.getLog(ResearchFiIterator.class);
|
||||
|
||||
private static final int PAGE_SIZE = 100;
|
||||
|
||||
private final String baseUrl;
|
||||
private final String authToken;
|
||||
private int currPage;
|
||||
private int nPages;
|
||||
|
||||
private final Queue<String> queue = new PriorityBlockingQueue<>();
|
||||
|
||||
public ResearchFiIterator(final String baseUrl, final String authToken) {
|
||||
this.baseUrl = baseUrl;
|
||||
this.authToken = authToken;
|
||||
this.currPage = 0;
|
||||
this.nPages = 0;
|
||||
}
|
||||
|
||||
private void verifyStarted() {
|
||||
if (this.currPage == 0) {
|
||||
try {
|
||||
nextCall();
|
||||
} catch (final CollectorException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
synchronized (this.queue) {
|
||||
verifyStarted();
|
||||
return !this.queue.isEmpty();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String next() {
|
||||
synchronized (this.queue) {
|
||||
verifyStarted();
|
||||
final String res = this.queue.poll();
|
||||
while (this.queue.isEmpty() && (this.currPage < this.nPages)) {
|
||||
try {
|
||||
nextCall();
|
||||
} catch (final CollectorException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
private void nextCall() throws CollectorException {
|
||||
|
||||
this.currPage += 1;
|
||||
|
||||
try (final CloseableHttpClient client = HttpClients.createDefault()) {
|
||||
final String url;
|
||||
if (!this.baseUrl.contains("?")) {
|
||||
url = String.format("%s?PageNumber=%d&PageSize=%d", this.baseUrl, this.currPage, PAGE_SIZE);
|
||||
} else if (!this.baseUrl.contains("PageSize=")) {
|
||||
url = String.format("%s&PageNumber=%d&PageSize=%d", this.baseUrl, this.currPage, PAGE_SIZE);
|
||||
} else {
|
||||
url = String.format("%s&PageNumber=%d", this.baseUrl, this.currPage);
|
||||
}
|
||||
log.info("Calling url: " + url);
|
||||
|
||||
final HttpGet req = new HttpGet(url);
|
||||
req.addHeader("Authorization", "Bearer " + this.authToken);
|
||||
try (final CloseableHttpResponse response = client.execute(req)) {
|
||||
for (final Header header : response.getAllHeaders()) {
|
||||
log.debug("HEADER: " + header.getName() + " = " + header.getValue());
|
||||
if ("x-page-count".equals(header.getName())) {
|
||||
final int totalPages = NumberUtils.toInt(header.getValue());
|
||||
if (this.nPages != totalPages) {
|
||||
this.nPages = NumberUtils.toInt(header.getValue());
|
||||
log.info("Total pages: " + totalPages);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final String content = IOUtils.toString(response.getEntity().getContent());
|
||||
final JSONArray jsonArray = new JSONArray(content);
|
||||
|
||||
jsonArray.forEach(obj -> this.queue.add(JsonUtils.convertToXML(obj.toString())));
|
||||
}
|
||||
} catch (final Throwable e) {
|
||||
log.warn("Error obtaining access token", e);
|
||||
throw new CollectorException("Error obtaining access token", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
package eu.dnetlib.dhp.collection.plugin.researchfi;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.dom4j.DocumentException;
|
||||
import org.dom4j.DocumentHelper;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import eu.dnetlib.dhp.collection.ApiDescriptor;
|
||||
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||
|
||||
public class ResearchFiCollectorPluginTest {
|
||||
|
||||
private final ResearchFiCollectorPlugin plugin = new ResearchFiCollectorPlugin();
|
||||
|
||||
@Test
|
||||
@Disabled
|
||||
void testCollect() throws CollectorException {
|
||||
final ApiDescriptor api = new ApiDescriptor();
|
||||
api.setBaseUrl("https://research.fi/api/rest/v1/funding-decisions?FunderName=AKA&FundingStartYearFrom=2022");
|
||||
api.setProtocol("research_fi");
|
||||
api.getParams().put("auth_url", "https://researchfi-auth.2.rahtiapp.fi/realms/publicapi/protocol/openid-connect/token");
|
||||
api.getParams().put("auth_client_id", "");
|
||||
api.getParams().put("auth_client_secret", "");
|
||||
|
||||
final AtomicLong count = new AtomicLong(0);
|
||||
final Set<String> ids = new HashSet<>();
|
||||
|
||||
this.plugin.collect(api, new AggregatorReport()).forEach(s -> {
|
||||
|
||||
if (count.getAndIncrement() == 0) {
|
||||
System.out.println("First: " + s);
|
||||
}
|
||||
|
||||
try {
|
||||
final String id = DocumentHelper.parseText(s).valueOf("/recordWrap/funderProjectNumber");
|
||||
if (ids.contains(id)) {
|
||||
System.out.println("Id already present: " + id);
|
||||
}
|
||||
ids.add(id);
|
||||
} catch (final DocumentException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
System.out.println("Total records: " + count);
|
||||
System.out.println("Total identifiers: " + ids.size());
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue