1
0
Fork 0

Merge pull request 'rest-collector-plugin-with-retry' (#432) from rest-collector-plugin-with-retry into beta

Reviewed-on: D-Net/dnet-hadoop#432
This commit is contained in:
Claudio Atzori 2024-05-10 09:02:33 +02:00
commit 53e7bb4336
2 changed files with 248 additions and 194 deletions

View File

@ -18,7 +18,11 @@ import javax.xml.transform.TransformerConfigurationException;
import javax.xml.transform.TransformerFactory; import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource; import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult; import javax.xml.transform.stream.StreamResult;
import javax.xml.xpath.*; import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpression;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -35,7 +39,7 @@ import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams; import eu.dnetlib.dhp.common.collection.HttpClientParams;
/** /**
* log.info(...) equal to log.trace(...) in the application-logs * log.info(...) equal to log.trace(...) in the application-logs
* <p> * <p>
* known bug: at resumptionType 'discover' if the (resultTotal % resultSizeValue) == 0 the collecting fails -> change the resultSizeValue * known bug: at resumptionType 'discover' if the (resultTotal % resultSizeValue) == 0 the collecting fails -> change the resultSizeValue
* *
@ -47,6 +51,7 @@ public class RestIterator implements Iterator<String> {
private static final Logger log = LoggerFactory.getLogger(RestIterator.class); private static final Logger log = LoggerFactory.getLogger(RestIterator.class);
public static final String UTF_8 = "UTF-8"; public static final String UTF_8 = "UTF-8";
private static final int MAX_ATTEMPTS = 5;
private final HttpClientParams clientParams; private final HttpClientParams clientParams;
@ -60,8 +65,9 @@ public class RestIterator implements Iterator<String> {
private final int resultSizeValue; private final int resultSizeValue;
private int resumptionInt = 0; // integer resumption token (first record to harvest) private int resumptionInt = 0; // integer resumption token (first record to harvest)
private int resultTotal = -1; private int resultTotal = -1;
private String resumptionStr = Integer.toString(resumptionInt); // string resumption token (first record to harvest private String resumptionStr = Integer.toString(this.resumptionInt); // string resumption token (first record to
// or token scanned from results) // harvest
// or token scanned from results)
private InputStream resultStream; private InputStream resultStream;
private Transformer transformer; private Transformer transformer;
private XPath xpath; private XPath xpath;
@ -73,7 +79,7 @@ public class RestIterator implements Iterator<String> {
private final String querySize; private final String querySize;
private final String authMethod; private final String authMethod;
private final String authToken; private final String authToken;
private final Queue<String> recordQueue = new PriorityBlockingQueue<String>(); private final Queue<String> recordQueue = new PriorityBlockingQueue<>();
private int discoverResultSize = 0; private int discoverResultSize = 0;
private int pagination = 1; private int pagination = 1;
/* /*
@ -83,8 +89,8 @@ public class RestIterator implements Iterator<String> {
*/ */
private final String resultOutputFormat; private final String resultOutputFormat;
/** RestIterator class /**
* compatible to version 1.3.33 * RestIterator class compatible to version 1.3.33
*/ */
public RestIterator( public RestIterator(
final HttpClientParams clientParams, final HttpClientParams clientParams,
@ -108,40 +114,42 @@ public class RestIterator implements Iterator<String> {
this.resumptionType = resumptionType; this.resumptionType = resumptionType;
this.resumptionParam = resumptionParam; this.resumptionParam = resumptionParam;
this.resultFormatValue = resultFormatValue; this.resultFormatValue = resultFormatValue;
this.resultSizeValue = Integer.valueOf(resultSizeValueStr); this.resultSizeValue = Integer.parseInt(resultSizeValueStr);
this.queryParams = queryParams; this.queryParams = queryParams;
this.authMethod = authMethod; this.authMethod = authMethod;
this.authToken = authToken; this.authToken = authToken;
this.resultOutputFormat = resultOutputFormat; this.resultOutputFormat = resultOutputFormat;
queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue this.queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue
: "";
this.querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr
: ""; : "";
querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr : "";
try { try {
initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath); initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath);
} catch (Exception e) { } catch (final Exception e) {
throw new IllegalStateException("xml transformation init failed: " + e.getMessage()); throw new IllegalStateException("xml transformation init failed: " + e.getMessage());
} }
initQueue(); initQueue();
} }
private void initXmlTransformation(String resultTotalXpath, String resumptionXpath, String entityXpath) private void initXmlTransformation(final String resultTotalXpath, final String resumptionXpath,
final String entityXpath)
throws TransformerConfigurationException, XPathExpressionException { throws TransformerConfigurationException, XPathExpressionException {
final TransformerFactory factory = TransformerFactory.newInstance(); final TransformerFactory factory = TransformerFactory.newInstance();
transformer = factory.newTransformer(); this.transformer = factory.newTransformer();
transformer.setOutputProperty(OutputKeys.INDENT, "yes"); this.transformer.setOutputProperty(OutputKeys.INDENT, "yes");
transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "3"); this.transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "3");
xpath = XPathFactory.newInstance().newXPath(); this.xpath = XPathFactory.newInstance().newXPath();
xprResultTotalPath = xpath.compile(resultTotalXpath); this.xprResultTotalPath = this.xpath.compile(resultTotalXpath);
xprResumptionPath = xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath); this.xprResumptionPath = this.xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath);
xprEntity = xpath.compile(entityXpath); this.xprEntity = this.xpath.compile(entityXpath);
} }
private void initQueue() { private void initQueue() {
query = baseUrl + "?" + queryParams + querySize + queryFormat; this.query = this.baseUrl + "?" + this.queryParams + this.querySize + this.queryFormat;
log.info("REST calls starting with {}", query); log.info("REST calls starting with {}", this.query);
} }
private void disconnect() { private void disconnect() {
@ -154,12 +162,11 @@ public class RestIterator implements Iterator<String> {
*/ */
@Override @Override
public boolean hasNext() { public boolean hasNext() {
if (recordQueue.isEmpty() && query.isEmpty()) { if (this.recordQueue.isEmpty() && this.query.isEmpty()) {
disconnect(); disconnect();
return false; return false;
} else {
return true;
} }
return true;
} }
/* /*
@ -168,214 +175,241 @@ public class RestIterator implements Iterator<String> {
*/ */
@Override @Override
public String next() { public String next() {
synchronized (recordQueue) { synchronized (this.recordQueue) {
while (recordQueue.isEmpty() && !query.isEmpty()) { while (this.recordQueue.isEmpty() && !this.query.isEmpty()) {
try { try {
query = downloadPage(query); this.query = downloadPage(this.query, 0);
} catch (CollectorException e) { } catch (final CollectorException e) {
log.debug("CollectorPlugin.next()-Exception: {}", e); log.debug("CollectorPlugin.next()-Exception: {}", e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
return recordQueue.poll(); return this.recordQueue.poll();
} }
} }
/* /*
* download page and return nextQuery * download page and return nextQuery (with number of attempt)
*/ */
private String downloadPage(String query) throws CollectorException { private String downloadPage(String query, final int attempt) throws CollectorException {
String resultJson;
String resultXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
String nextQuery = "";
String emptyXml = resultXml + "<" + JsonUtils.XML_WRAP_TAG + "></" + JsonUtils.XML_WRAP_TAG + ">";
Node resultNode = null;
NodeList nodeList = null;
String qUrlArgument = "";
int urlOldResumptionSize = 0;
InputStream theHttpInputStream;
// check if cursor=* is initial set otherwise add it to the queryParam URL if (attempt > MAX_ATTEMPTS) {
if (resumptionType.equalsIgnoreCase("deep-cursor")) { throw new CollectorException("Max Number of attempts reached, query:" + query);
log.debug("check resumptionType deep-cursor and check cursor=*?{}", query); }
if (!query.contains("&cursor=")) {
query += "&cursor=*"; if (attempt > 0) {
final int delay = (attempt * 5000);
log.debug("Attempt {} with delay {}", attempt, delay);
try {
Thread.sleep(delay);
} catch (final InterruptedException e) {
new CollectorException(e);
} }
} }
try { try {
log.info("requestig URL [{}]", query); String resultJson;
String resultXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
String nextQuery = "";
final String emptyXml = resultXml + "<" + JsonUtils.XML_WRAP_TAG + "></" + JsonUtils.XML_WRAP_TAG + ">";
Node resultNode = null;
NodeList nodeList = null;
String qUrlArgument = "";
int urlOldResumptionSize = 0;
InputStream theHttpInputStream;
URL qUrl = new URL(query); // check if cursor=* is initial set otherwise add it to the queryParam URL
log.debug("authMethod: {}", authMethod); if ("deep-cursor".equalsIgnoreCase(this.resumptionType)) {
if ("bearer".equalsIgnoreCase(this.authMethod)) { log.debug("check resumptionType deep-cursor and check cursor=*?{}", query);
log.trace("authMethod before inputStream: {}", resultXml); if (!query.contains("&cursor=")) {
HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection(); query += "&cursor=*";
conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + authToken);
conn.setRequestProperty(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType());
conn.setRequestMethod("GET");
theHttpInputStream = conn.getInputStream();
} else if (BASIC.equalsIgnoreCase(this.authMethod)) {
log.trace("authMethod before inputStream: {}", resultXml);
HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Basic " + authToken);
conn.setRequestProperty(HttpHeaders.ACCEPT, ContentType.APPLICATION_XML.getMimeType());
conn.setRequestMethod("GET");
theHttpInputStream = conn.getInputStream();
} else {
theHttpInputStream = qUrl.openStream();
}
resultStream = theHttpInputStream;
if ("json".equals(resultOutputFormat)) {
resultJson = IOUtils.toString(resultStream, StandardCharsets.UTF_8);
resultXml = JsonUtils.convertToXML(resultJson);
resultStream = IOUtils.toInputStream(resultXml, UTF_8);
}
if (!(emptyXml).equalsIgnoreCase(resultXml)) {
resultNode = (Node) xpath.evaluate("/", new InputSource(resultStream), XPathConstants.NODE);
nodeList = (NodeList) xprEntity.evaluate(resultNode, XPathConstants.NODESET);
log.debug("nodeList.length: {}", nodeList.getLength());
for (int i = 0; i < nodeList.getLength(); i++) {
StringWriter sw = new StringWriter();
transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
String toEnqueue = sw.toString();
if (toEnqueue == null || StringUtils.isBlank(toEnqueue) || emptyXml.equalsIgnoreCase(toEnqueue)) {
log.warn("The following record resulted in empty item for the feeding queue: {}", resultXml);
} else {
recordQueue.add(sw.toString());
}
} }
} else {
log.warn("resultXml is equal with emptyXml");
} }
resumptionInt += resultSizeValue; try {
log.info("requesting URL [{}]", query);
switch (resumptionType.toLowerCase()) { final URL qUrl = new URL(query);
case "scan": // read of resumptionToken , evaluate next results, e.g. OAI, iterate over items log.debug("authMethod: {}", this.authMethod);
resumptionStr = xprResumptionPath.evaluate(resultNode); if ("bearer".equalsIgnoreCase(this.authMethod)) {
break; log.trace("authMethod before inputStream: {}", resultXml);
final HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + this.authToken);
conn.setRequestProperty(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType());
conn.setRequestMethod("GET");
theHttpInputStream = conn.getInputStream();
} else if (this.BASIC.equalsIgnoreCase(this.authMethod)) {
log.trace("authMethod before inputStream: {}", resultXml);
final HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Basic " + this.authToken);
conn.setRequestProperty(HttpHeaders.ACCEPT, ContentType.APPLICATION_XML.getMimeType());
conn.setRequestMethod("GET");
theHttpInputStream = conn.getInputStream();
} else {
theHttpInputStream = qUrl.openStream();
}
case "count": // begin at one step for all records, iterate over items this.resultStream = theHttpInputStream;
resumptionStr = Integer.toString(resumptionInt); if ("json".equals(this.resultOutputFormat)) {
break; resultJson = IOUtils.toString(this.resultStream, StandardCharsets.UTF_8);
resultXml = JsonUtils.convertToXML(resultJson);
this.resultStream = IOUtils.toInputStream(resultXml, UTF_8);
}
case "discover": // size of result items unknown, iterate over items (for openDOAR - 201808) if (!(emptyXml).equalsIgnoreCase(resultXml)) {
if (resultSizeValue < 2) { resultNode = (Node) this.xpath
throw new CollectorException("Mode: discover, Param 'resultSizeValue' is less than 2"); .evaluate("/", new InputSource(this.resultStream), XPathConstants.NODE);
nodeList = (NodeList) this.xprEntity.evaluate(resultNode, XPathConstants.NODESET);
log.debug("nodeList.length: {}", nodeList.getLength());
for (int i = 0; i < nodeList.getLength(); i++) {
final StringWriter sw = new StringWriter();
this.transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
final String toEnqueue = sw.toString();
if ((toEnqueue == null) || StringUtils.isBlank(toEnqueue)
|| emptyXml.equalsIgnoreCase(toEnqueue)) {
log
.warn(
"The following record resulted in empty item for the feeding queue: {}", resultXml);
} else {
this.recordQueue.add(sw.toString());
}
} }
qUrlArgument = qUrl.getQuery(); } else {
String[] arrayQUrlArgument = qUrlArgument.split("&"); log.warn("resultXml is equal with emptyXml");
for (String arrayUrlArgStr : arrayQUrlArgument) { }
if (arrayUrlArgStr.startsWith(resumptionParam)) {
String[] resumptionKeyValue = arrayUrlArgStr.split("="); this.resumptionInt += this.resultSizeValue;
if (isInteger(resumptionKeyValue[1])) {
urlOldResumptionSize = Integer.parseInt(resumptionKeyValue[1]); switch (this.resumptionType.toLowerCase()) {
log.debug("discover OldResumptionSize from Url (int): {}", urlOldResumptionSize); case "scan": // read of resumptionToken , evaluate next results, e.g. OAI, iterate over items
} else { this.resumptionStr = this.xprResumptionPath.evaluate(resultNode);
log.debug("discover OldResumptionSize from Url (str): {}", resumptionKeyValue[1]); break;
case "count": // begin at one step for all records, iterate over items
this.resumptionStr = Integer.toString(this.resumptionInt);
break;
case "discover": // size of result items unknown, iterate over items (for openDOAR - 201808)
if (this.resultSizeValue < 2) {
throw new CollectorException("Mode: discover, Param 'resultSizeValue' is less than 2");
}
qUrlArgument = qUrl.getQuery();
final String[] arrayQUrlArgument = qUrlArgument.split("&");
for (final String arrayUrlArgStr : arrayQUrlArgument) {
if (arrayUrlArgStr.startsWith(this.resumptionParam)) {
final String[] resumptionKeyValue = arrayUrlArgStr.split("=");
if (isInteger(resumptionKeyValue[1])) {
urlOldResumptionSize = Integer.parseInt(resumptionKeyValue[1]);
log.debug("discover OldResumptionSize from Url (int): {}", urlOldResumptionSize);
} else {
log.debug("discover OldResumptionSize from Url (str): {}", resumptionKeyValue[1]);
}
} }
} }
}
if (((emptyXml).equalsIgnoreCase(resultXml)) if (((emptyXml).equalsIgnoreCase(resultXml))
|| ((nodeList != null) && (nodeList.getLength() < resultSizeValue))) { || ((nodeList != null) && (nodeList.getLength() < this.resultSizeValue))) {
// resumptionStr = ""; // resumptionStr = "";
if (nodeList != null) { if (nodeList != null) {
discoverResultSize += nodeList.getLength(); this.discoverResultSize += nodeList.getLength();
}
this.resultTotal = this.discoverResultSize;
} else {
this.resumptionStr = Integer.toString(this.resumptionInt);
this.resultTotal = this.resumptionInt + 1;
if (nodeList != null) {
this.discoverResultSize += nodeList.getLength();
}
} }
resultTotal = discoverResultSize; log.info("discoverResultSize: {}", this.discoverResultSize);
} else { break;
resumptionStr = Integer.toString(resumptionInt);
resultTotal = resumptionInt + 1; case "pagination":
case "page": // pagination, iterate over page numbers
this.pagination += 1;
if (nodeList != null) { if (nodeList != null) {
discoverResultSize += nodeList.getLength(); this.discoverResultSize += nodeList.getLength();
} else {
this.resultTotal = this.discoverResultSize;
this.pagination = this.discoverResultSize;
} }
} this.resumptionInt = this.pagination;
log.info("discoverResultSize: {}", discoverResultSize); this.resumptionStr = Integer.toString(this.resumptionInt);
break; break;
case "pagination": case "deep-cursor": // size of result items unknown, iterate over items (for supporting deep cursor
case "page": // pagination, iterate over page numbers // in
pagination += 1; // solr)
if (nodeList != null) { // isn't relevant -- if (resultSizeValue < 2) {throw new CollectorServiceException("Mode:
discoverResultSize += nodeList.getLength(); // deep-cursor, Param 'resultSizeValue' is less than 2");}
} else {
resultTotal = discoverResultSize;
pagination = discoverResultSize;
}
resumptionInt = pagination;
resumptionStr = Integer.toString(resumptionInt);
break;
case "deep-cursor": // size of result items unknown, iterate over items (for supporting deep cursor in this.resumptionStr = encodeValue(this.xprResumptionPath.evaluate(resultNode));
// solr) this.queryParams = this.queryParams.replace("&cursor=*", "");
// isn't relevant -- if (resultSizeValue < 2) {throw new CollectorServiceException("Mode:
// deep-cursor, Param 'resultSizeValue' is less than 2");}
resumptionStr = encodeValue(xprResumptionPath.evaluate(resultNode)); // terminating if length of nodeList is 0
queryParams = queryParams.replace("&cursor=*", ""); if ((nodeList != null) && (nodeList.getLength() < this.discoverResultSize)) {
this.resumptionInt += ((nodeList.getLength() + 1) - this.resultSizeValue);
} else {
this.resumptionInt += (nodeList.getLength() - this.resultSizeValue); // subtract the
// resultSizeValue
// because the iteration is over
// real length and the
// resultSizeValue is added before
// the switch()
}
// terminating if length of nodeList is 0 this.discoverResultSize = nodeList.getLength();
if ((nodeList != null) && (nodeList.getLength() < discoverResultSize)) {
resumptionInt += (nodeList.getLength() + 1 - resultSizeValue);
} else {
resumptionInt += (nodeList.getLength() - resultSizeValue); // subtract the resultSizeValue
// because the iteration is over
// real length and the
// resultSizeValue is added before
// the switch()
}
discoverResultSize = nodeList.getLength(); log
.debug(
"downloadPage().deep-cursor: resumptionStr=" + this.resumptionStr + " ; queryParams="
+ this.queryParams + " resumptionLengthIncreased: " + this.resumptionInt);
log break;
.debug(
"downloadPage().deep-cursor: resumptionStr=" + resumptionStr + " ; queryParams="
+ queryParams + " resumptionLengthIncreased: " + resumptionInt);
break; default: // otherwise: abort
// resultTotal = resumptionInt;
break;
}
default: // otherwise: abort } catch (final Exception e) {
// resultTotal = resumptionInt; log.error(e.getMessage(), e);
break; throw new IllegalStateException("collection failed: " + e.getMessage());
} }
} catch (Exception e) { try {
log.error(e.getMessage(), e); if (this.resultTotal == -1) {
throw new IllegalStateException("collection failed: " + e.getMessage()); this.resultTotal = Integer.parseInt(this.xprResultTotalPath.evaluate(resultNode));
} if ("page".equalsIgnoreCase(this.resumptionType) && !this.BASIC.equalsIgnoreCase(this.authMethod)) {
this.resultTotal += 1;
try { } // to correct the upper bound
if (resultTotal == -1) { log.info("resultTotal was -1 is now: " + this.resultTotal);
resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode)); }
if (resumptionType.equalsIgnoreCase("page") && !BASIC.equalsIgnoreCase(authMethod)) { } catch (final Exception e) {
resultTotal += 1; log.error(e.getMessage(), e);
} // to correct the upper bound throw new IllegalStateException("downloadPage resultTotal couldn't parse: " + e.getMessage());
log.info("resultTotal was -1 is now: " + resultTotal);
} }
} catch (Exception e) { log.debug("resultTotal: " + this.resultTotal);
log.error(e.getMessage(), e); log.debug("resInt: " + this.resumptionInt);
throw new IllegalStateException("downloadPage resultTotal couldn't parse: " + e.getMessage()); if (this.resumptionInt <= this.resultTotal) {
nextQuery = this.baseUrl + "?" + this.queryParams + this.querySize + "&" + this.resumptionParam + "="
+ this.resumptionStr
+ this.queryFormat;
} else {
nextQuery = "";
// if (resumptionType.toLowerCase().equals("deep-cursor")) { resumptionInt -= 1; } // correct the
// resumptionInt and prevent a NullPointer Exception at mdStore
}
log.debug("nextQueryUrl: " + nextQuery);
return nextQuery;
} catch (final Throwable e) {
log.warn(e.getMessage(), e);
return downloadPage(query, attempt + 1);
} }
log.debug("resultTotal: " + resultTotal);
log.debug("resInt: " + resumptionInt);
if (resumptionInt <= resultTotal) {
nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr
+ queryFormat;
} else {
nextQuery = "";
// if (resumptionType.toLowerCase().equals("deep-cursor")) { resumptionInt -= 1; } // correct the
// resumptionInt and prevent a NullPointer Exception at mdStore
}
log.debug("nextQueryUrl: " + nextQuery);
return nextQuery;
} }
private boolean isInteger(String s) { private boolean isInteger(final String s) {
boolean isValidInteger = false; boolean isValidInteger = false;
try { try {
Integer.parseInt(s); Integer.parseInt(s);
@ -383,7 +417,7 @@ public class RestIterator implements Iterator<String> {
// s is a valid integer // s is a valid integer
isValidInteger = true; isValidInteger = true;
} catch (NumberFormatException ex) { } catch (final NumberFormatException ex) {
// s is not an integer // s is not an integer
} }
@ -391,20 +425,20 @@ public class RestIterator implements Iterator<String> {
} }
// Method to encode a string value using `UTF-8` encoding scheme // Method to encode a string value using `UTF-8` encoding scheme
private String encodeValue(String value) { private String encodeValue(final String value) {
try { try {
return URLEncoder.encode(value, StandardCharsets.UTF_8.toString()); return URLEncoder.encode(value, StandardCharsets.UTF_8.toString());
} catch (UnsupportedEncodingException ex) { } catch (final UnsupportedEncodingException ex) {
throw new RuntimeException(ex.getCause()); throw new RuntimeException(ex.getCause());
} }
} }
public String getResultFormatValue() { public String getResultFormatValue() {
return resultFormatValue; return this.resultFormatValue;
} }
public String getResultOutputFormat() { public String getResultOutputFormat() {
return resultOutputFormat; return this.resultOutputFormat;
} }
} }

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.collection.plugin.rest;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
@ -69,7 +70,7 @@ public class OsfPreprintCollectorTest {
@Test @Test
@Disabled @Disabled
void test() throws CollectorException { void test_limited() throws CollectorException {
final AtomicInteger i = new AtomicInteger(0); final AtomicInteger i = new AtomicInteger(0);
final Stream<String> stream = this.rcp.collect(this.api, new AggregatorReport()); final Stream<String> stream = this.rcp.collect(this.api, new AggregatorReport());
@ -82,4 +83,23 @@ public class OsfPreprintCollectorTest {
log.info("{}", i.intValue()); log.info("{}", i.intValue());
Assertions.assertTrue(i.intValue() > 0); Assertions.assertTrue(i.intValue() > 0);
} }
@Test
@Disabled
void test_all() throws CollectorException {
final AtomicLong i = new AtomicLong(0);
final Stream<String> stream = this.rcp.collect(this.api, new AggregatorReport());
stream.forEach(s -> {
Assertions.assertTrue(s.length() > 0);
if ((i.incrementAndGet() % 1000) == 0) {
log.info("COLLECTED: {}", i.get());
}
});
log.info("TOTAL: {}", i.get());
Assertions.assertTrue(i.get() > 0);
}
} }