diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestIterator.java index 1107bcf46e..76af6cff1a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestIterator.java @@ -18,7 +18,11 @@ import javax.xml.transform.TransformerConfigurationException; import javax.xml.transform.TransformerFactory; import javax.xml.transform.dom.DOMSource; import javax.xml.transform.stream.StreamResult; -import javax.xml.xpath.*; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpression; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -35,7 +39,7 @@ import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.common.collection.HttpClientParams; /** - * log.info(...) equal to log.trace(...) in the application-logs + * log.info(...) equal to log.trace(...) in the application-logs *

* known bug: at resumptionType 'discover' if the (resultTotal % resultSizeValue) == 0 the collecting fails -> change the resultSizeValue * @@ -47,6 +51,7 @@ public class RestIterator implements Iterator { private static final Logger log = LoggerFactory.getLogger(RestIterator.class); public static final String UTF_8 = "UTF-8"; + private static final int MAX_ATTEMPTS = 5; private final HttpClientParams clientParams; @@ -60,8 +65,9 @@ public class RestIterator implements Iterator { private final int resultSizeValue; private int resumptionInt = 0; // integer resumption token (first record to harvest) private int resultTotal = -1; - private String resumptionStr = Integer.toString(resumptionInt); // string resumption token (first record to harvest - // or token scanned from results) + private String resumptionStr = Integer.toString(this.resumptionInt); // string resumption token (first record to + // harvest + // or token scanned from results) private InputStream resultStream; private Transformer transformer; private XPath xpath; @@ -73,7 +79,7 @@ public class RestIterator implements Iterator { private final String querySize; private final String authMethod; private final String authToken; - private final Queue recordQueue = new PriorityBlockingQueue(); + private final Queue recordQueue = new PriorityBlockingQueue<>(); private int discoverResultSize = 0; private int pagination = 1; /* @@ -83,8 +89,8 @@ public class RestIterator implements Iterator { */ private final String resultOutputFormat; - /** RestIterator class - * compatible to version 1.3.33 + /** + * RestIterator class compatible to version 1.3.33 */ public RestIterator( final HttpClientParams clientParams, @@ -108,40 +114,42 @@ public class RestIterator implements Iterator { this.resumptionType = resumptionType; this.resumptionParam = resumptionParam; this.resultFormatValue = resultFormatValue; - this.resultSizeValue = Integer.valueOf(resultSizeValueStr); + this.resultSizeValue = Integer.parseInt(resultSizeValueStr); this.queryParams = queryParams; this.authMethod = authMethod; this.authToken = authToken; this.resultOutputFormat = resultOutputFormat; - queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue + this.queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue + : ""; + this.querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr : ""; - querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr : ""; try { initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath); - } catch (Exception e) { + } catch (final Exception e) { throw new IllegalStateException("xml transformation init failed: " + e.getMessage()); } initQueue(); } - private void initXmlTransformation(String resultTotalXpath, String resumptionXpath, String entityXpath) + private void initXmlTransformation(final String resultTotalXpath, final String resumptionXpath, + final String entityXpath) throws TransformerConfigurationException, XPathExpressionException { final TransformerFactory factory = TransformerFactory.newInstance(); - transformer = factory.newTransformer(); - transformer.setOutputProperty(OutputKeys.INDENT, "yes"); - transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "3"); - xpath = XPathFactory.newInstance().newXPath(); - xprResultTotalPath = xpath.compile(resultTotalXpath); - xprResumptionPath = xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath); - xprEntity = xpath.compile(entityXpath); + this.transformer = factory.newTransformer(); + this.transformer.setOutputProperty(OutputKeys.INDENT, "yes"); + this.transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "3"); + this.xpath = XPathFactory.newInstance().newXPath(); + this.xprResultTotalPath = this.xpath.compile(resultTotalXpath); + this.xprResumptionPath = this.xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath); + this.xprEntity = this.xpath.compile(entityXpath); } private void initQueue() { - query = baseUrl + "?" + queryParams + querySize + queryFormat; - log.info("REST calls starting with {}", query); + this.query = this.baseUrl + "?" + this.queryParams + this.querySize + this.queryFormat; + log.info("REST calls starting with {}", this.query); } private void disconnect() { @@ -154,12 +162,11 @@ public class RestIterator implements Iterator { */ @Override public boolean hasNext() { - if (recordQueue.isEmpty() && query.isEmpty()) { + if (this.recordQueue.isEmpty() && this.query.isEmpty()) { disconnect(); return false; - } else { - return true; } + return true; } /* @@ -168,214 +175,241 @@ public class RestIterator implements Iterator { */ @Override public String next() { - synchronized (recordQueue) { - while (recordQueue.isEmpty() && !query.isEmpty()) { + synchronized (this.recordQueue) { + while (this.recordQueue.isEmpty() && !this.query.isEmpty()) { try { - query = downloadPage(query); - } catch (CollectorException e) { + this.query = downloadPage(this.query, 0); + } catch (final CollectorException e) { log.debug("CollectorPlugin.next()-Exception: {}", e); throw new RuntimeException(e); } } - return recordQueue.poll(); + return this.recordQueue.poll(); } } /* - * download page and return nextQuery + * download page and return nextQuery (with number of attempt) */ - private String downloadPage(String query) throws CollectorException { - String resultJson; - String resultXml = ""; - String nextQuery = ""; - String emptyXml = resultXml + "<" + JsonUtils.XML_WRAP_TAG + ">"; - Node resultNode = null; - NodeList nodeList = null; - String qUrlArgument = ""; - int urlOldResumptionSize = 0; - InputStream theHttpInputStream; + private String downloadPage(String query, final int attempt) throws CollectorException { - // check if cursor=* is initial set otherwise add it to the queryParam URL - if (resumptionType.equalsIgnoreCase("deep-cursor")) { - log.debug("check resumptionType deep-cursor and check cursor=*?{}", query); - if (!query.contains("&cursor=")) { - query += "&cursor=*"; + if (attempt > MAX_ATTEMPTS) { + throw new CollectorException("Max Number of attempts reached, query:" + query); + } + + if (attempt > 0) { + final int delay = (attempt * 5000); + log.debug("Attempt {} with delay {}", attempt, delay); + try { + Thread.sleep(delay); + } catch (final InterruptedException e) { + new CollectorException(e); } } try { - log.info("requestig URL [{}]", query); + String resultJson; + String resultXml = ""; + String nextQuery = ""; + final String emptyXml = resultXml + "<" + JsonUtils.XML_WRAP_TAG + ">"; + Node resultNode = null; + NodeList nodeList = null; + String qUrlArgument = ""; + int urlOldResumptionSize = 0; + InputStream theHttpInputStream; - URL qUrl = new URL(query); - log.debug("authMethod: {}", authMethod); - if ("bearer".equalsIgnoreCase(this.authMethod)) { - log.trace("authMethod before inputStream: {}", resultXml); - HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection(); - conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + authToken); - conn.setRequestProperty(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); - conn.setRequestMethod("GET"); - theHttpInputStream = conn.getInputStream(); - } else if (BASIC.equalsIgnoreCase(this.authMethod)) { - log.trace("authMethod before inputStream: {}", resultXml); - HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection(); - conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Basic " + authToken); - conn.setRequestProperty(HttpHeaders.ACCEPT, ContentType.APPLICATION_XML.getMimeType()); - conn.setRequestMethod("GET"); - theHttpInputStream = conn.getInputStream(); - } else { - theHttpInputStream = qUrl.openStream(); - } - - resultStream = theHttpInputStream; - if ("json".equals(resultOutputFormat)) { - resultJson = IOUtils.toString(resultStream, StandardCharsets.UTF_8); - resultXml = JsonUtils.convertToXML(resultJson); - resultStream = IOUtils.toInputStream(resultXml, UTF_8); - } - - if (!(emptyXml).equalsIgnoreCase(resultXml)) { - resultNode = (Node) xpath.evaluate("/", new InputSource(resultStream), XPathConstants.NODE); - nodeList = (NodeList) xprEntity.evaluate(resultNode, XPathConstants.NODESET); - log.debug("nodeList.length: {}", nodeList.getLength()); - for (int i = 0; i < nodeList.getLength(); i++) { - StringWriter sw = new StringWriter(); - transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw)); - String toEnqueue = sw.toString(); - if (toEnqueue == null || StringUtils.isBlank(toEnqueue) || emptyXml.equalsIgnoreCase(toEnqueue)) { - log.warn("The following record resulted in empty item for the feeding queue: {}", resultXml); - } else { - recordQueue.add(sw.toString()); - } + // check if cursor=* is initial set otherwise add it to the queryParam URL + if ("deep-cursor".equalsIgnoreCase(this.resumptionType)) { + log.debug("check resumptionType deep-cursor and check cursor=*?{}", query); + if (!query.contains("&cursor=")) { + query += "&cursor=*"; } - } else { - log.warn("resultXml is equal with emptyXml"); } - resumptionInt += resultSizeValue; + try { + log.info("requesting URL [{}]", query); - switch (resumptionType.toLowerCase()) { - case "scan": // read of resumptionToken , evaluate next results, e.g. OAI, iterate over items - resumptionStr = xprResumptionPath.evaluate(resultNode); - break; + final URL qUrl = new URL(query); + log.debug("authMethod: {}", this.authMethod); + if ("bearer".equalsIgnoreCase(this.authMethod)) { + log.trace("authMethod before inputStream: {}", resultXml); + final HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection(); + conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + this.authToken); + conn.setRequestProperty(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); + conn.setRequestMethod("GET"); + theHttpInputStream = conn.getInputStream(); + } else if (this.BASIC.equalsIgnoreCase(this.authMethod)) { + log.trace("authMethod before inputStream: {}", resultXml); + final HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection(); + conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Basic " + this.authToken); + conn.setRequestProperty(HttpHeaders.ACCEPT, ContentType.APPLICATION_XML.getMimeType()); + conn.setRequestMethod("GET"); + theHttpInputStream = conn.getInputStream(); + } else { + theHttpInputStream = qUrl.openStream(); + } - case "count": // begin at one step for all records, iterate over items - resumptionStr = Integer.toString(resumptionInt); - break; + this.resultStream = theHttpInputStream; + if ("json".equals(this.resultOutputFormat)) { + resultJson = IOUtils.toString(this.resultStream, StandardCharsets.UTF_8); + resultXml = JsonUtils.convertToXML(resultJson); + this.resultStream = IOUtils.toInputStream(resultXml, UTF_8); + } - case "discover": // size of result items unknown, iterate over items (for openDOAR - 201808) - if (resultSizeValue < 2) { - throw new CollectorException("Mode: discover, Param 'resultSizeValue' is less than 2"); + if (!(emptyXml).equalsIgnoreCase(resultXml)) { + resultNode = (Node) this.xpath + .evaluate("/", new InputSource(this.resultStream), XPathConstants.NODE); + nodeList = (NodeList) this.xprEntity.evaluate(resultNode, XPathConstants.NODESET); + log.debug("nodeList.length: {}", nodeList.getLength()); + for (int i = 0; i < nodeList.getLength(); i++) { + final StringWriter sw = new StringWriter(); + this.transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw)); + final String toEnqueue = sw.toString(); + if ((toEnqueue == null) || StringUtils.isBlank(toEnqueue) + || emptyXml.equalsIgnoreCase(toEnqueue)) { + log + .warn( + "The following record resulted in empty item for the feeding queue: {}", resultXml); + } else { + this.recordQueue.add(sw.toString()); + } } - qUrlArgument = qUrl.getQuery(); - String[] arrayQUrlArgument = qUrlArgument.split("&"); - for (String arrayUrlArgStr : arrayQUrlArgument) { - if (arrayUrlArgStr.startsWith(resumptionParam)) { - String[] resumptionKeyValue = arrayUrlArgStr.split("="); - if (isInteger(resumptionKeyValue[1])) { - urlOldResumptionSize = Integer.parseInt(resumptionKeyValue[1]); - log.debug("discover OldResumptionSize from Url (int): {}", urlOldResumptionSize); - } else { - log.debug("discover OldResumptionSize from Url (str): {}", resumptionKeyValue[1]); + } else { + log.warn("resultXml is equal with emptyXml"); + } + + this.resumptionInt += this.resultSizeValue; + + switch (this.resumptionType.toLowerCase()) { + case "scan": // read of resumptionToken , evaluate next results, e.g. OAI, iterate over items + this.resumptionStr = this.xprResumptionPath.evaluate(resultNode); + break; + + case "count": // begin at one step for all records, iterate over items + this.resumptionStr = Integer.toString(this.resumptionInt); + break; + + case "discover": // size of result items unknown, iterate over items (for openDOAR - 201808) + if (this.resultSizeValue < 2) { + throw new CollectorException("Mode: discover, Param 'resultSizeValue' is less than 2"); + } + qUrlArgument = qUrl.getQuery(); + final String[] arrayQUrlArgument = qUrlArgument.split("&"); + for (final String arrayUrlArgStr : arrayQUrlArgument) { + if (arrayUrlArgStr.startsWith(this.resumptionParam)) { + final String[] resumptionKeyValue = arrayUrlArgStr.split("="); + if (isInteger(resumptionKeyValue[1])) { + urlOldResumptionSize = Integer.parseInt(resumptionKeyValue[1]); + log.debug("discover OldResumptionSize from Url (int): {}", urlOldResumptionSize); + } else { + log.debug("discover OldResumptionSize from Url (str): {}", resumptionKeyValue[1]); + } } } - } - if (((emptyXml).equalsIgnoreCase(resultXml)) - || ((nodeList != null) && (nodeList.getLength() < resultSizeValue))) { - // resumptionStr = ""; - if (nodeList != null) { - discoverResultSize += nodeList.getLength(); + if (((emptyXml).equalsIgnoreCase(resultXml)) + || ((nodeList != null) && (nodeList.getLength() < this.resultSizeValue))) { + // resumptionStr = ""; + if (nodeList != null) { + this.discoverResultSize += nodeList.getLength(); + } + this.resultTotal = this.discoverResultSize; + } else { + this.resumptionStr = Integer.toString(this.resumptionInt); + this.resultTotal = this.resumptionInt + 1; + if (nodeList != null) { + this.discoverResultSize += nodeList.getLength(); + } } - resultTotal = discoverResultSize; - } else { - resumptionStr = Integer.toString(resumptionInt); - resultTotal = resumptionInt + 1; + log.info("discoverResultSize: {}", this.discoverResultSize); + break; + + case "pagination": + case "page": // pagination, iterate over page numbers + this.pagination += 1; if (nodeList != null) { - discoverResultSize += nodeList.getLength(); + this.discoverResultSize += nodeList.getLength(); + } else { + this.resultTotal = this.discoverResultSize; + this.pagination = this.discoverResultSize; } - } - log.info("discoverResultSize: {}", discoverResultSize); - break; + this.resumptionInt = this.pagination; + this.resumptionStr = Integer.toString(this.resumptionInt); + break; - case "pagination": - case "page": // pagination, iterate over page numbers - pagination += 1; - if (nodeList != null) { - discoverResultSize += nodeList.getLength(); - } else { - resultTotal = discoverResultSize; - pagination = discoverResultSize; - } - resumptionInt = pagination; - resumptionStr = Integer.toString(resumptionInt); - break; + case "deep-cursor": // size of result items unknown, iterate over items (for supporting deep cursor + // in + // solr) + // isn't relevant -- if (resultSizeValue < 2) {throw new CollectorServiceException("Mode: + // deep-cursor, Param 'resultSizeValue' is less than 2");} - case "deep-cursor": // size of result items unknown, iterate over items (for supporting deep cursor in - // solr) - // isn't relevant -- if (resultSizeValue < 2) {throw new CollectorServiceException("Mode: - // deep-cursor, Param 'resultSizeValue' is less than 2");} + this.resumptionStr = encodeValue(this.xprResumptionPath.evaluate(resultNode)); + this.queryParams = this.queryParams.replace("&cursor=*", ""); - resumptionStr = encodeValue(xprResumptionPath.evaluate(resultNode)); - queryParams = queryParams.replace("&cursor=*", ""); + // terminating if length of nodeList is 0 + if ((nodeList != null) && (nodeList.getLength() < this.discoverResultSize)) { + this.resumptionInt += ((nodeList.getLength() + 1) - this.resultSizeValue); + } else { + this.resumptionInt += (nodeList.getLength() - this.resultSizeValue); // subtract the + // resultSizeValue + // because the iteration is over + // real length and the + // resultSizeValue is added before + // the switch() + } - // terminating if length of nodeList is 0 - if ((nodeList != null) && (nodeList.getLength() < discoverResultSize)) { - resumptionInt += (nodeList.getLength() + 1 - resultSizeValue); - } else { - resumptionInt += (nodeList.getLength() - resultSizeValue); // subtract the resultSizeValue - // because the iteration is over - // real length and the - // resultSizeValue is added before - // the switch() - } + this.discoverResultSize = nodeList.getLength(); - discoverResultSize = nodeList.getLength(); + log + .debug( + "downloadPage().deep-cursor: resumptionStr=" + this.resumptionStr + " ; queryParams=" + + this.queryParams + " resumptionLengthIncreased: " + this.resumptionInt); - log - .debug( - "downloadPage().deep-cursor: resumptionStr=" + resumptionStr + " ; queryParams=" - + queryParams + " resumptionLengthIncreased: " + resumptionInt); + break; - break; + default: // otherwise: abort + // resultTotal = resumptionInt; + break; + } - default: // otherwise: abort - // resultTotal = resumptionInt; - break; + } catch (final Exception e) { + log.error(e.getMessage(), e); + throw new IllegalStateException("collection failed: " + e.getMessage()); } - } catch (Exception e) { - log.error(e.getMessage(), e); - throw new IllegalStateException("collection failed: " + e.getMessage()); - } - - try { - if (resultTotal == -1) { - resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode)); - if (resumptionType.equalsIgnoreCase("page") && !BASIC.equalsIgnoreCase(authMethod)) { - resultTotal += 1; - } // to correct the upper bound - log.info("resultTotal was -1 is now: " + resultTotal); + try { + if (this.resultTotal == -1) { + this.resultTotal = Integer.parseInt(this.xprResultTotalPath.evaluate(resultNode)); + if ("page".equalsIgnoreCase(this.resumptionType) && !this.BASIC.equalsIgnoreCase(this.authMethod)) { + this.resultTotal += 1; + } // to correct the upper bound + log.info("resultTotal was -1 is now: " + this.resultTotal); + } + } catch (final Exception e) { + log.error(e.getMessage(), e); + throw new IllegalStateException("downloadPage resultTotal couldn't parse: " + e.getMessage()); } - } catch (Exception e) { - log.error(e.getMessage(), e); - throw new IllegalStateException("downloadPage resultTotal couldn't parse: " + e.getMessage()); + log.debug("resultTotal: " + this.resultTotal); + log.debug("resInt: " + this.resumptionInt); + if (this.resumptionInt <= this.resultTotal) { + nextQuery = this.baseUrl + "?" + this.queryParams + this.querySize + "&" + this.resumptionParam + "=" + + this.resumptionStr + + this.queryFormat; + } else { + nextQuery = ""; + // if (resumptionType.toLowerCase().equals("deep-cursor")) { resumptionInt -= 1; } // correct the + // resumptionInt and prevent a NullPointer Exception at mdStore + } + log.debug("nextQueryUrl: " + nextQuery); + return nextQuery; + } catch (final Throwable e) { + log.warn(e.getMessage(), e); + return downloadPage(query, attempt + 1); } - log.debug("resultTotal: " + resultTotal); - log.debug("resInt: " + resumptionInt); - if (resumptionInt <= resultTotal) { - nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr - + queryFormat; - } else { - nextQuery = ""; - // if (resumptionType.toLowerCase().equals("deep-cursor")) { resumptionInt -= 1; } // correct the - // resumptionInt and prevent a NullPointer Exception at mdStore - } - log.debug("nextQueryUrl: " + nextQuery); - return nextQuery; } - private boolean isInteger(String s) { + private boolean isInteger(final String s) { boolean isValidInteger = false; try { Integer.parseInt(s); @@ -383,7 +417,7 @@ public class RestIterator implements Iterator { // s is a valid integer isValidInteger = true; - } catch (NumberFormatException ex) { + } catch (final NumberFormatException ex) { // s is not an integer } @@ -391,20 +425,20 @@ public class RestIterator implements Iterator { } // Method to encode a string value using `UTF-8` encoding scheme - private String encodeValue(String value) { + private String encodeValue(final String value) { try { return URLEncoder.encode(value, StandardCharsets.UTF_8.toString()); - } catch (UnsupportedEncodingException ex) { + } catch (final UnsupportedEncodingException ex) { throw new RuntimeException(ex.getCause()); } } public String getResultFormatValue() { - return resultFormatValue; + return this.resultFormatValue; } public String getResultOutputFormat() { - return resultOutputFormat; + return this.resultOutputFormat; } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/rest/OsfPreprintCollectorTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/rest/OsfPreprintCollectorTest.java index bc2d126619..90f4c7f25b 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/rest/OsfPreprintCollectorTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/rest/OsfPreprintCollectorTest.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.collection.plugin.rest; import java.util.HashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; import org.junit.jupiter.api.Assertions; @@ -69,7 +70,7 @@ public class OsfPreprintCollectorTest { @Test @Disabled - void test() throws CollectorException { + void test_limited() throws CollectorException { final AtomicInteger i = new AtomicInteger(0); final Stream stream = this.rcp.collect(this.api, new AggregatorReport()); @@ -82,4 +83,23 @@ public class OsfPreprintCollectorTest { log.info("{}", i.intValue()); Assertions.assertTrue(i.intValue() > 0); } + + @Test + @Disabled + void test_all() throws CollectorException { + final AtomicLong i = new AtomicLong(0); + final Stream stream = this.rcp.collect(this.api, new AggregatorReport()); + + stream.forEach(s -> { + Assertions.assertTrue(s.length() > 0); + if ((i.incrementAndGet() % 1000) == 0) { + log.info("COLLECTED: {}", i.get()); + } + + }); + + log.info("TOTAL: {}", i.get()); + Assertions.assertTrue(i.get() > 0); + } + }