1
0
Fork 0

Merge pull request 'support of the new research.fi apis' (#515) from update_researchfi_plugin into beta

Reviewed-on: D-Net/dnet-hadoop#515
This commit is contained in:
Miriam Baglioni 2024-12-04 13:53:01 +01:00
commit 89b7bc84f2
1 changed files with 27 additions and 27 deletions

View File

@ -6,7 +6,7 @@ import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.Header;
@ -27,25 +27,25 @@ public class ResearchFiIterator implements Iterator<String> {
private final String baseUrl;
private final String authToken;
private int currPage;
private int nPages;
private String nextUrl;
private int nCalls = 0;
private final Queue<String> queue = new PriorityBlockingQueue<>();
public ResearchFiIterator(final String baseUrl, final String authToken) {
this.baseUrl = baseUrl;
this.authToken = authToken;
this.currPage = 0;
this.nPages = 0;
this.nextUrl = null;
}
private void verifyStarted() {
if (this.currPage == 0) {
try {
nextCall();
} catch (final CollectorException e) {
throw new IllegalStateException(e);
try {
if (this.nCalls == 0) {
this.nextUrl = invokeUrl(this.baseUrl);
}
} catch (final CollectorException e) {
throw new IllegalStateException(e);
}
}
@ -62,9 +62,9 @@ public class ResearchFiIterator implements Iterator<String> {
synchronized (this.queue) {
verifyStarted();
final String res = this.queue.poll();
while (this.queue.isEmpty() && (this.currPage < this.nPages)) {
while (this.queue.isEmpty() && StringUtils.isNotBlank(this.nextUrl)) {
try {
nextCall();
this.nextUrl = invokeUrl(this.nextUrl);
} catch (final CollectorException e) {
throw new IllegalStateException(e);
}
@ -73,18 +73,11 @@ public class ResearchFiIterator implements Iterator<String> {
}
}
private void nextCall() throws CollectorException {
private String invokeUrl(final String url) throws CollectorException {
this.currPage += 1;
this.nCalls += 1;
String next = null;
final String url;
if (!this.baseUrl.contains("?")) {
url = String.format("%s?PageNumber=%d&PageSize=%d", this.baseUrl, this.currPage, PAGE_SIZE);
} else if (!this.baseUrl.contains("PageSize=")) {
url = String.format("%s&PageNumber=%d&PageSize=%d", this.baseUrl, this.currPage, PAGE_SIZE);
} else {
url = String.format("%s&PageNumber=%d", this.baseUrl, this.currPage);
}
log.info("Calling url: " + url);
try (final CloseableHttpClient client = HttpClients.createDefault()) {
@ -94,11 +87,15 @@ public class ResearchFiIterator implements Iterator<String> {
try (final CloseableHttpResponse response = client.execute(req)) {
for (final Header header : response.getAllHeaders()) {
log.debug("HEADER: " + header.getName() + " = " + header.getValue());
if ("x-page-count".equals(header.getName())) {
final int totalPages = NumberUtils.toInt(header.getValue());
if (this.nPages != totalPages) {
this.nPages = NumberUtils.toInt(header.getValue());
log.info("Total pages: " + totalPages);
if ("link".equals(header.getName())) {
final String s = StringUtils.substringBetween(header.getValue(), "<", ">");
final String token = StringUtils
.substringBefore(StringUtils.substringAfter(s, "NextPageToken="), "&");
if (this.baseUrl.contains("?")) {
next = this.baseUrl + "&NextPageToken=" + token;
} else {
next = this.baseUrl + "?NextPageToken=" + token;
}
}
}
@ -108,6 +105,9 @@ public class ResearchFiIterator implements Iterator<String> {
jsonArray.forEach(obj -> this.queue.add(JsonUtils.convertToXML(obj.toString())));
}
return next;
} catch (final Throwable e) {
log.warn("Error calling url: " + url, e);
throw new CollectorException("Error calling url: " + url, e);