From 2615136efc0a86ceb92f82f2380e68230330ef83 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 30 Apr 2024 11:58:42 +0200 Subject: [PATCH] added a retry mechanism --- .../collection/plugin/rest/RestIterator.java | 379 +++++++++--------- 1 file changed, 200 insertions(+), 179 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestIterator.java index 1107bcf46..c13f29806 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestIterator.java @@ -18,7 +18,11 @@ import javax.xml.transform.TransformerConfigurationException; import javax.xml.transform.TransformerFactory; import javax.xml.transform.dom.DOMSource; import javax.xml.transform.stream.StreamResult; -import javax.xml.xpath.*; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpression; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -35,7 +39,7 @@ import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.common.collection.HttpClientParams; /** - * log.info(...) equal to log.trace(...) in the application-logs + * log.info(...) equal to log.trace(...) in the application-logs *

* known bug: at resumptionType 'discover' if the (resultTotal % resultSizeValue) == 0 the collecting fails -> change the resultSizeValue * @@ -47,6 +51,7 @@ public class RestIterator implements Iterator { private static final Logger log = LoggerFactory.getLogger(RestIterator.class); public static final String UTF_8 = "UTF-8"; + private static final int MAX_ATTEMPTS = 5; private final HttpClientParams clientParams; @@ -60,8 +65,8 @@ public class RestIterator implements Iterator { private final int resultSizeValue; private int resumptionInt = 0; // integer resumption token (first record to harvest) private int resultTotal = -1; - private String resumptionStr = Integer.toString(resumptionInt); // string resumption token (first record to harvest - // or token scanned from results) + private String resumptionStr = Integer.toString(this.resumptionInt); // string resumption token (first record to harvest + // or token scanned from results) private InputStream resultStream; private Transformer transformer; private XPath xpath; @@ -73,75 +78,75 @@ public class RestIterator implements Iterator { private final String querySize; private final String authMethod; private final String authToken; - private final Queue recordQueue = new PriorityBlockingQueue(); + private final Queue recordQueue = new PriorityBlockingQueue<>(); private int discoverResultSize = 0; private int pagination = 1; /* - * While resultFormatValue is added to the request parameter, this is used to say that the results are retrieved in - * json. useful for cases when the target API expects a resultFormatValue != json, but the results are returned in - * json. An example is the EU Open Data Portal API: resultFormatValue=standard, results are in json format. + * While resultFormatValue is added to the request parameter, this is used to say that the results are retrieved in json. useful for + * cases when the target API expects a resultFormatValue != json, but the results are returned in json. An example is the EU Open Data + * Portal API: resultFormatValue=standard, results are in json format. */ private final String resultOutputFormat; - /** RestIterator class - * compatible to version 1.3.33 + /** + * RestIterator class compatible to version 1.3.33 */ public RestIterator( - final HttpClientParams clientParams, - final String baseUrl, - final String resumptionType, - final String resumptionParam, - final String resumptionXpath, - final String resultTotalXpath, - final String resultFormatParam, - final String resultFormatValue, - final String resultSizeParam, - final String resultSizeValueStr, - final String queryParams, - final String entityXpath, - final String authMethod, - final String authToken, - final String resultOutputFormat) { + final HttpClientParams clientParams, + final String baseUrl, + final String resumptionType, + final String resumptionParam, + final String resumptionXpath, + final String resultTotalXpath, + final String resultFormatParam, + final String resultFormatValue, + final String resultSizeParam, + final String resultSizeValueStr, + final String queryParams, + final String entityXpath, + final String authMethod, + final String authToken, + final String resultOutputFormat) { this.clientParams = clientParams; this.baseUrl = baseUrl; this.resumptionType = resumptionType; this.resumptionParam = resumptionParam; this.resultFormatValue = resultFormatValue; - this.resultSizeValue = Integer.valueOf(resultSizeValueStr); + this.resultSizeValue = Integer.parseInt(resultSizeValueStr); this.queryParams = queryParams; this.authMethod = authMethod; this.authToken = authToken; this.resultOutputFormat = resultOutputFormat; - queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue - : ""; - querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr : ""; + this.queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue + : ""; + this.querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr : ""; try { initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath); - } catch (Exception e) { + } catch (final Exception e) { throw new IllegalStateException("xml transformation init failed: " + e.getMessage()); } initQueue(); } - private void initXmlTransformation(String resultTotalXpath, String resumptionXpath, String entityXpath) - throws TransformerConfigurationException, XPathExpressionException { + private void initXmlTransformation(final String resultTotalXpath, final String resumptionXpath, final String entityXpath) + throws TransformerConfigurationException, XPathExpressionException { final TransformerFactory factory = TransformerFactory.newInstance(); - transformer = factory.newTransformer(); - transformer.setOutputProperty(OutputKeys.INDENT, "yes"); - transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "3"); - xpath = XPathFactory.newInstance().newXPath(); - xprResultTotalPath = xpath.compile(resultTotalXpath); - xprResumptionPath = xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath); - xprEntity = xpath.compile(entityXpath); + this.transformer = factory.newTransformer(); + this.transformer.setOutputProperty(OutputKeys.INDENT, "yes"); + this.transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "3"); + this.xpath = XPathFactory.newInstance().newXPath(); + this.xprResultTotalPath = this.xpath.compile(resultTotalXpath); + this.xprResumptionPath = this.xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath); + this.xprEntity = this.xpath.compile(entityXpath); } private void initQueue() { - query = baseUrl + "?" + queryParams + querySize + queryFormat; - log.info("REST calls starting with {}", query); + this.query = this.baseUrl + "?" + this.queryParams + this.querySize + this.queryFormat; + log.info("REST calls starting with {}", this.query); } private void disconnect() { @@ -150,127 +155,140 @@ public class RestIterator implements Iterator { /* * (non-Javadoc) + * * @see java.util.Iterator#hasNext() */ @Override public boolean hasNext() { - if (recordQueue.isEmpty() && query.isEmpty()) { + if (this.recordQueue.isEmpty() && this.query.isEmpty()) { disconnect(); return false; - } else { - return true; } + return true; } /* * (non-Javadoc) + * * @see java.util.Iterator#next() */ @Override public String next() { - synchronized (recordQueue) { - while (recordQueue.isEmpty() && !query.isEmpty()) { + synchronized (this.recordQueue) { + while (this.recordQueue.isEmpty() && !this.query.isEmpty()) { try { - query = downloadPage(query); - } catch (CollectorException e) { + this.query = downloadPage(this.query, 0); + } catch (final CollectorException e) { log.debug("CollectorPlugin.next()-Exception: {}", e); throw new RuntimeException(e); } } - return recordQueue.poll(); + return this.recordQueue.poll(); } } /* - * download page and return nextQuery + * download page and return nextQuery (with number of attempt) */ - private String downloadPage(String query) throws CollectorException { - String resultJson; - String resultXml = ""; - String nextQuery = ""; - String emptyXml = resultXml + "<" + JsonUtils.XML_WRAP_TAG + ">"; - Node resultNode = null; - NodeList nodeList = null; - String qUrlArgument = ""; - int urlOldResumptionSize = 0; - InputStream theHttpInputStream; + private String downloadPage(String query, final int attempt) throws CollectorException { - // check if cursor=* is initial set otherwise add it to the queryParam URL - if (resumptionType.equalsIgnoreCase("deep-cursor")) { - log.debug("check resumptionType deep-cursor and check cursor=*?{}", query); - if (!query.contains("&cursor=")) { - query += "&cursor=*"; + if (attempt > MAX_ATTEMPTS) { throw new CollectorException("Max Number of attempts reached, query:" + query); } + + if (attempt > 0) { + final int delay = (attempt * 5000); + log.debug("Attempt {} with delay {}", attempt, delay); + try { + Thread.sleep(delay); + } catch (final InterruptedException e) { + new CollectorException(e); } } try { - log.info("requestig URL [{}]", query); + String resultJson; + String resultXml = ""; + String nextQuery = ""; + final String emptyXml = resultXml + "<" + JsonUtils.XML_WRAP_TAG + ">"; + Node resultNode = null; + NodeList nodeList = null; + String qUrlArgument = ""; + int urlOldResumptionSize = 0; + InputStream theHttpInputStream; - URL qUrl = new URL(query); - log.debug("authMethod: {}", authMethod); - if ("bearer".equalsIgnoreCase(this.authMethod)) { - log.trace("authMethod before inputStream: {}", resultXml); - HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection(); - conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + authToken); - conn.setRequestProperty(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); - conn.setRequestMethod("GET"); - theHttpInputStream = conn.getInputStream(); - } else if (BASIC.equalsIgnoreCase(this.authMethod)) { - log.trace("authMethod before inputStream: {}", resultXml); - HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection(); - conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Basic " + authToken); - conn.setRequestProperty(HttpHeaders.ACCEPT, ContentType.APPLICATION_XML.getMimeType()); - conn.setRequestMethod("GET"); - theHttpInputStream = conn.getInputStream(); - } else { - theHttpInputStream = qUrl.openStream(); - } - - resultStream = theHttpInputStream; - if ("json".equals(resultOutputFormat)) { - resultJson = IOUtils.toString(resultStream, StandardCharsets.UTF_8); - resultXml = JsonUtils.convertToXML(resultJson); - resultStream = IOUtils.toInputStream(resultXml, UTF_8); - } - - if (!(emptyXml).equalsIgnoreCase(resultXml)) { - resultNode = (Node) xpath.evaluate("/", new InputSource(resultStream), XPathConstants.NODE); - nodeList = (NodeList) xprEntity.evaluate(resultNode, XPathConstants.NODESET); - log.debug("nodeList.length: {}", nodeList.getLength()); - for (int i = 0; i < nodeList.getLength(); i++) { - StringWriter sw = new StringWriter(); - transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw)); - String toEnqueue = sw.toString(); - if (toEnqueue == null || StringUtils.isBlank(toEnqueue) || emptyXml.equalsIgnoreCase(toEnqueue)) { - log.warn("The following record resulted in empty item for the feeding queue: {}", resultXml); - } else { - recordQueue.add(sw.toString()); - } + // check if cursor=* is initial set otherwise add it to the queryParam URL + if ("deep-cursor".equalsIgnoreCase(this.resumptionType)) { + log.debug("check resumptionType deep-cursor and check cursor=*?{}", query); + if (!query.contains("&cursor=")) { + query += "&cursor=*"; } - } else { - log.warn("resultXml is equal with emptyXml"); } - resumptionInt += resultSizeValue; + try { + log.info("requesting URL [{}]", query); - switch (resumptionType.toLowerCase()) { + final URL qUrl = new URL(query); + log.debug("authMethod: {}", this.authMethod); + if ("bearer".equalsIgnoreCase(this.authMethod)) { + log.trace("authMethod before inputStream: {}", resultXml); + final HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection(); + conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + this.authToken); + conn.setRequestProperty(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); + conn.setRequestMethod("GET"); + theHttpInputStream = conn.getInputStream(); + } else if (this.BASIC.equalsIgnoreCase(this.authMethod)) { + log.trace("authMethod before inputStream: {}", resultXml); + final HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection(); + conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Basic " + this.authToken); + conn.setRequestProperty(HttpHeaders.ACCEPT, ContentType.APPLICATION_XML.getMimeType()); + conn.setRequestMethod("GET"); + theHttpInputStream = conn.getInputStream(); + } else { + theHttpInputStream = qUrl.openStream(); + } + + this.resultStream = theHttpInputStream; + if ("json".equals(this.resultOutputFormat)) { + resultJson = IOUtils.toString(this.resultStream, StandardCharsets.UTF_8); + resultXml = JsonUtils.convertToXML(resultJson); + this.resultStream = IOUtils.toInputStream(resultXml, UTF_8); + } + + if (!(emptyXml).equalsIgnoreCase(resultXml)) { + resultNode = (Node) this.xpath.evaluate("/", new InputSource(this.resultStream), XPathConstants.NODE); + nodeList = (NodeList) this.xprEntity.evaluate(resultNode, XPathConstants.NODESET); + log.debug("nodeList.length: {}", nodeList.getLength()); + for (int i = 0; i < nodeList.getLength(); i++) { + final StringWriter sw = new StringWriter(); + this.transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw)); + final String toEnqueue = sw.toString(); + if ((toEnqueue == null) || StringUtils.isBlank(toEnqueue) || emptyXml.equalsIgnoreCase(toEnqueue)) { + log.warn("The following record resulted in empty item for the feeding queue: {}", resultXml); + } else { + this.recordQueue.add(sw.toString()); + } + } + } else { + log.warn("resultXml is equal with emptyXml"); + } + + this.resumptionInt += this.resultSizeValue; + + switch (this.resumptionType.toLowerCase()) { case "scan": // read of resumptionToken , evaluate next results, e.g. OAI, iterate over items - resumptionStr = xprResumptionPath.evaluate(resultNode); + this.resumptionStr = this.xprResumptionPath.evaluate(resultNode); break; case "count": // begin at one step for all records, iterate over items - resumptionStr = Integer.toString(resumptionInt); + this.resumptionStr = Integer.toString(this.resumptionInt); break; case "discover": // size of result items unknown, iterate over items (for openDOAR - 201808) - if (resultSizeValue < 2) { - throw new CollectorException("Mode: discover, Param 'resultSizeValue' is less than 2"); - } + if (this.resultSizeValue < 2) { throw new CollectorException("Mode: discover, Param 'resultSizeValue' is less than 2"); } qUrlArgument = qUrl.getQuery(); - String[] arrayQUrlArgument = qUrlArgument.split("&"); - for (String arrayUrlArgStr : arrayQUrlArgument) { - if (arrayUrlArgStr.startsWith(resumptionParam)) { - String[] resumptionKeyValue = arrayUrlArgStr.split("="); + final String[] arrayQUrlArgument = qUrlArgument.split("&"); + for (final String arrayUrlArgStr : arrayQUrlArgument) { + if (arrayUrlArgStr.startsWith(this.resumptionParam)) { + final String[] resumptionKeyValue = arrayUrlArgStr.split("="); if (isInteger(resumptionKeyValue[1])) { urlOldResumptionSize = Integer.parseInt(resumptionKeyValue[1]); log.debug("discover OldResumptionSize from Url (int): {}", urlOldResumptionSize); @@ -281,101 +299,104 @@ public class RestIterator implements Iterator { } if (((emptyXml).equalsIgnoreCase(resultXml)) - || ((nodeList != null) && (nodeList.getLength() < resultSizeValue))) { + || ((nodeList != null) && (nodeList.getLength() < this.resultSizeValue))) { // resumptionStr = ""; if (nodeList != null) { - discoverResultSize += nodeList.getLength(); + this.discoverResultSize += nodeList.getLength(); } - resultTotal = discoverResultSize; + this.resultTotal = this.discoverResultSize; } else { - resumptionStr = Integer.toString(resumptionInt); - resultTotal = resumptionInt + 1; + this.resumptionStr = Integer.toString(this.resumptionInt); + this.resultTotal = this.resumptionInt + 1; if (nodeList != null) { - discoverResultSize += nodeList.getLength(); + this.discoverResultSize += nodeList.getLength(); } } - log.info("discoverResultSize: {}", discoverResultSize); + log.info("discoverResultSize: {}", this.discoverResultSize); break; case "pagination": case "page": // pagination, iterate over page numbers - pagination += 1; + this.pagination += 1; if (nodeList != null) { - discoverResultSize += nodeList.getLength(); + this.discoverResultSize += nodeList.getLength(); } else { - resultTotal = discoverResultSize; - pagination = discoverResultSize; + this.resultTotal = this.discoverResultSize; + this.pagination = this.discoverResultSize; } - resumptionInt = pagination; - resumptionStr = Integer.toString(resumptionInt); + this.resumptionInt = this.pagination; + this.resumptionStr = Integer.toString(this.resumptionInt); break; case "deep-cursor": // size of result items unknown, iterate over items (for supporting deep cursor in - // solr) + // solr) // isn't relevant -- if (resultSizeValue < 2) {throw new CollectorServiceException("Mode: // deep-cursor, Param 'resultSizeValue' is less than 2");} - resumptionStr = encodeValue(xprResumptionPath.evaluate(resultNode)); - queryParams = queryParams.replace("&cursor=*", ""); + this.resumptionStr = encodeValue(this.xprResumptionPath.evaluate(resultNode)); + this.queryParams = this.queryParams.replace("&cursor=*", ""); // terminating if length of nodeList is 0 - if ((nodeList != null) && (nodeList.getLength() < discoverResultSize)) { - resumptionInt += (nodeList.getLength() + 1 - resultSizeValue); + if ((nodeList != null) && (nodeList.getLength() < this.discoverResultSize)) { + this.resumptionInt += ((nodeList.getLength() + 1) - this.resultSizeValue); } else { - resumptionInt += (nodeList.getLength() - resultSizeValue); // subtract the resultSizeValue - // because the iteration is over - // real length and the - // resultSizeValue is added before - // the switch() + this.resumptionInt += (nodeList.getLength() - this.resultSizeValue); // subtract the resultSizeValue + // because the iteration is over + // real length and the + // resultSizeValue is added before + // the switch() } - discoverResultSize = nodeList.getLength(); + this.discoverResultSize = nodeList.getLength(); log - .debug( - "downloadPage().deep-cursor: resumptionStr=" + resumptionStr + " ; queryParams=" - + queryParams + " resumptionLengthIncreased: " + resumptionInt); + .debug("downloadPage().deep-cursor: resumptionStr=" + this.resumptionStr + " ; queryParams=" + + this.queryParams + " resumptionLengthIncreased: " + this.resumptionInt); break; default: // otherwise: abort // resultTotal = resumptionInt; break; + } + + } catch (final Exception e) { + log.error(e.getMessage(), e); + throw new IllegalStateException("collection failed: " + e.getMessage()); } - } catch (Exception e) { - log.error(e.getMessage(), e); - throw new IllegalStateException("collection failed: " + e.getMessage()); - } - - try { - if (resultTotal == -1) { - resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode)); - if (resumptionType.equalsIgnoreCase("page") && !BASIC.equalsIgnoreCase(authMethod)) { - resultTotal += 1; - } // to correct the upper bound - log.info("resultTotal was -1 is now: " + resultTotal); + try { + if (this.resultTotal == -1) { + this.resultTotal = Integer.parseInt(this.xprResultTotalPath.evaluate(resultNode)); + if ("page".equalsIgnoreCase(this.resumptionType) && !this.BASIC.equalsIgnoreCase(this.authMethod)) { + this.resultTotal += 1; + } // to correct the upper bound + log.info("resultTotal was -1 is now: " + this.resultTotal); + } + } catch (final Exception e) { + log.error(e.getMessage(), e); + throw new IllegalStateException("downloadPage resultTotal couldn't parse: " + e.getMessage()); } - } catch (Exception e) { - log.error(e.getMessage(), e); - throw new IllegalStateException("downloadPage resultTotal couldn't parse: " + e.getMessage()); + log.debug("resultTotal: " + this.resultTotal); + log.debug("resInt: " + this.resumptionInt); + if (this.resumptionInt <= this.resultTotal) { + nextQuery = this.baseUrl + "?" + this.queryParams + this.querySize + "&" + this.resumptionParam + "=" + this.resumptionStr + + this.queryFormat; + } else { + nextQuery = ""; + // if (resumptionType.toLowerCase().equals("deep-cursor")) { resumptionInt -= 1; } // correct the + // resumptionInt and prevent a NullPointer Exception at mdStore + } + log.debug("nextQueryUrl: " + nextQuery); + return nextQuery; + } catch (final Throwable e) { + log.warn(e.getMessage(), e); + return downloadPage(query, attempt + 1); } - log.debug("resultTotal: " + resultTotal); - log.debug("resInt: " + resumptionInt); - if (resumptionInt <= resultTotal) { - nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr - + queryFormat; - } else { - nextQuery = ""; - // if (resumptionType.toLowerCase().equals("deep-cursor")) { resumptionInt -= 1; } // correct the - // resumptionInt and prevent a NullPointer Exception at mdStore - } - log.debug("nextQueryUrl: " + nextQuery); - return nextQuery; } - private boolean isInteger(String s) { + private boolean isInteger(final String s) { boolean isValidInteger = false; try { Integer.parseInt(s); @@ -383,7 +404,7 @@ public class RestIterator implements Iterator { // s is a valid integer isValidInteger = true; - } catch (NumberFormatException ex) { + } catch (final NumberFormatException ex) { // s is not an integer } @@ -391,20 +412,20 @@ public class RestIterator implements Iterator { } // Method to encode a string value using `UTF-8` encoding scheme - private String encodeValue(String value) { + private String encodeValue(final String value) { try { return URLEncoder.encode(value, StandardCharsets.UTF_8.toString()); - } catch (UnsupportedEncodingException ex) { + } catch (final UnsupportedEncodingException ex) { throw new RuntimeException(ex.getCause()); } } public String getResultFormatValue() { - return resultFormatValue; + return this.resultFormatValue; } public String getResultOutputFormat() { - return resultOutputFormat; + return this.resultOutputFormat; } }