forked from D-Net/dnet-hadoop
support of the new apis
This commit is contained in:
parent
cc6bbbb804
commit
65902a87e3
|
@ -6,7 +6,7 @@ import java.util.Queue;
|
||||||
import java.util.concurrent.PriorityBlockingQueue;
|
import java.util.concurrent.PriorityBlockingQueue;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.math.NumberUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.http.Header;
|
import org.apache.http.Header;
|
||||||
|
@ -27,27 +27,27 @@ public class ResearchFiIterator implements Iterator<String> {
|
||||||
|
|
||||||
private final String baseUrl;
|
private final String baseUrl;
|
||||||
private final String authToken;
|
private final String authToken;
|
||||||
private int currPage;
|
private String nextUrl;
|
||||||
private int nPages;
|
private int nCalls = 0;
|
||||||
|
|
||||||
private final Queue<String> queue = new PriorityBlockingQueue<>();
|
private final Queue<String> queue = new PriorityBlockingQueue<>();
|
||||||
|
|
||||||
public ResearchFiIterator(final String baseUrl, final String authToken) {
|
public ResearchFiIterator(final String baseUrl, final String authToken) {
|
||||||
this.baseUrl = baseUrl;
|
this.baseUrl = baseUrl;
|
||||||
this.authToken = authToken;
|
this.authToken = authToken;
|
||||||
this.currPage = 0;
|
this.nextUrl = null;
|
||||||
this.nPages = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyStarted() {
|
private void verifyStarted() {
|
||||||
if (this.currPage == 0) {
|
|
||||||
try {
|
try {
|
||||||
nextCall();
|
if (this.nCalls == 0) {
|
||||||
|
this.nextUrl = invokeUrl(this.baseUrl);
|
||||||
|
}
|
||||||
} catch (final CollectorException e) {
|
} catch (final CollectorException e) {
|
||||||
throw new IllegalStateException(e);
|
throw new IllegalStateException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean hasNext() {
|
public boolean hasNext() {
|
||||||
|
@ -62,9 +62,9 @@ public class ResearchFiIterator implements Iterator<String> {
|
||||||
synchronized (this.queue) {
|
synchronized (this.queue) {
|
||||||
verifyStarted();
|
verifyStarted();
|
||||||
final String res = this.queue.poll();
|
final String res = this.queue.poll();
|
||||||
while (this.queue.isEmpty() && (this.currPage < this.nPages)) {
|
while (this.queue.isEmpty() && StringUtils.isNotBlank(this.nextUrl)) {
|
||||||
try {
|
try {
|
||||||
nextCall();
|
this.nextUrl = invokeUrl(this.nextUrl);
|
||||||
} catch (final CollectorException e) {
|
} catch (final CollectorException e) {
|
||||||
throw new IllegalStateException(e);
|
throw new IllegalStateException(e);
|
||||||
}
|
}
|
||||||
|
@ -73,18 +73,11 @@ public class ResearchFiIterator implements Iterator<String> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void nextCall() throws CollectorException {
|
private String invokeUrl(final String url) throws CollectorException {
|
||||||
|
|
||||||
this.currPage += 1;
|
this.nCalls += 1;
|
||||||
|
String next = null;
|
||||||
|
|
||||||
final String url;
|
|
||||||
if (!this.baseUrl.contains("?")) {
|
|
||||||
url = String.format("%s?PageNumber=%d&PageSize=%d", this.baseUrl, this.currPage, PAGE_SIZE);
|
|
||||||
} else if (!this.baseUrl.contains("PageSize=")) {
|
|
||||||
url = String.format("%s&PageNumber=%d&PageSize=%d", this.baseUrl, this.currPage, PAGE_SIZE);
|
|
||||||
} else {
|
|
||||||
url = String.format("%s&PageNumber=%d", this.baseUrl, this.currPage);
|
|
||||||
}
|
|
||||||
log.info("Calling url: " + url);
|
log.info("Calling url: " + url);
|
||||||
|
|
||||||
try (final CloseableHttpClient client = HttpClients.createDefault()) {
|
try (final CloseableHttpClient client = HttpClients.createDefault()) {
|
||||||
|
@ -94,11 +87,15 @@ public class ResearchFiIterator implements Iterator<String> {
|
||||||
try (final CloseableHttpResponse response = client.execute(req)) {
|
try (final CloseableHttpResponse response = client.execute(req)) {
|
||||||
for (final Header header : response.getAllHeaders()) {
|
for (final Header header : response.getAllHeaders()) {
|
||||||
log.debug("HEADER: " + header.getName() + " = " + header.getValue());
|
log.debug("HEADER: " + header.getName() + " = " + header.getValue());
|
||||||
if ("x-page-count".equals(header.getName())) {
|
if ("link".equals(header.getName())) {
|
||||||
final int totalPages = NumberUtils.toInt(header.getValue());
|
final String s = StringUtils.substringBetween(header.getValue(), "<", ">");
|
||||||
if (this.nPages != totalPages) {
|
final String token = StringUtils
|
||||||
this.nPages = NumberUtils.toInt(header.getValue());
|
.substringBefore(StringUtils.substringAfter(s, "NextPageToken="), "&");
|
||||||
log.info("Total pages: " + totalPages);
|
|
||||||
|
if (this.baseUrl.contains("?")) {
|
||||||
|
next = this.baseUrl + "&NextPageToken=" + token;
|
||||||
|
} else {
|
||||||
|
next = this.baseUrl + "?NextPageToken=" + token;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -108,6 +105,9 @@ public class ResearchFiIterator implements Iterator<String> {
|
||||||
|
|
||||||
jsonArray.forEach(obj -> this.queue.add(JsonUtils.convertToXML(obj.toString())));
|
jsonArray.forEach(obj -> this.queue.add(JsonUtils.convertToXML(obj.toString())));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return next;
|
||||||
|
|
||||||
} catch (final Throwable e) {
|
} catch (final Throwable e) {
|
||||||
log.warn("Error calling url: " + url, e);
|
log.warn("Error calling url: " + url, e);
|
||||||
throw new CollectorException("Error calling url: " + url, e);
|
throw new CollectorException("Error calling url: " + url, e);
|
||||||
|
|
Loading…
Reference in New Issue