rest-collector-plugin-with-retry #432

Merged
claudio.atzori merged 3 commits from rest-collector-plugin-with-retry into beta 2024-05-10 09:02:33 +02:00
1 changed files with 200 additions and 179 deletions
Showing only changes of commit 2615136efc - Show all commits

View File

@ -18,7 +18,11 @@ import javax.xml.transform.TransformerConfigurationException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import javax.xml.xpath.*;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpression;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
@ -35,7 +39,7 @@ import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
/**
* log.info(...) equal to log.trace(...) in the application-logs
* log.info(...) equal to log.trace(...) in the application-logs
* <p>
* known bug: at resumptionType 'discover' if the (resultTotal % resultSizeValue) == 0 the collecting fails -> change the resultSizeValue
*
@ -47,6 +51,7 @@ public class RestIterator implements Iterator<String> {
private static final Logger log = LoggerFactory.getLogger(RestIterator.class);
public static final String UTF_8 = "UTF-8";
private static final int MAX_ATTEMPTS = 5;
private final HttpClientParams clientParams;
@ -60,8 +65,8 @@ public class RestIterator implements Iterator<String> {
private final int resultSizeValue;
private int resumptionInt = 0; // integer resumption token (first record to harvest)
private int resultTotal = -1;
private String resumptionStr = Integer.toString(resumptionInt); // string resumption token (first record to harvest
// or token scanned from results)
private String resumptionStr = Integer.toString(this.resumptionInt); // string resumption token (first record to harvest
// or token scanned from results)
private InputStream resultStream;
private Transformer transformer;
private XPath xpath;
@ -73,75 +78,75 @@ public class RestIterator implements Iterator<String> {
private final String querySize;
private final String authMethod;
private final String authToken;
private final Queue<String> recordQueue = new PriorityBlockingQueue<String>();
private final Queue<String> recordQueue = new PriorityBlockingQueue<>();
private int discoverResultSize = 0;
private int pagination = 1;
/*
* While resultFormatValue is added to the request parameter, this is used to say that the results are retrieved in
* json. useful for cases when the target API expects a resultFormatValue != json, but the results are returned in
* json. An example is the EU Open Data Portal API: resultFormatValue=standard, results are in json format.
* While resultFormatValue is added to the request parameter, this is used to say that the results are retrieved in json. useful for
* cases when the target API expects a resultFormatValue != json, but the results are returned in json. An example is the EU Open Data
* Portal API: resultFormatValue=standard, results are in json format.
*/
private final String resultOutputFormat;
/** RestIterator class
* compatible to version 1.3.33
/**
* RestIterator class compatible to version 1.3.33
*/
public RestIterator(
final HttpClientParams clientParams,
final String baseUrl,
final String resumptionType,
final String resumptionParam,
final String resumptionXpath,
final String resultTotalXpath,
final String resultFormatParam,
final String resultFormatValue,
final String resultSizeParam,
final String resultSizeValueStr,
final String queryParams,
final String entityXpath,
final String authMethod,
final String authToken,
final String resultOutputFormat) {
final HttpClientParams clientParams,
final String baseUrl,
final String resumptionType,
final String resumptionParam,
final String resumptionXpath,
final String resultTotalXpath,
final String resultFormatParam,
final String resultFormatValue,
final String resultSizeParam,
final String resultSizeValueStr,
final String queryParams,
final String entityXpath,
final String authMethod,
final String authToken,
final String resultOutputFormat) {
this.clientParams = clientParams;
this.baseUrl = baseUrl;
this.resumptionType = resumptionType;
this.resumptionParam = resumptionParam;
this.resultFormatValue = resultFormatValue;
this.resultSizeValue = Integer.valueOf(resultSizeValueStr);
this.resultSizeValue = Integer.parseInt(resultSizeValueStr);
this.queryParams = queryParams;
this.authMethod = authMethod;
this.authToken = authToken;
this.resultOutputFormat = resultOutputFormat;
queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue
: "";
querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr : "";
this.queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue
: "";
this.querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr : "";
try {
initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath);
} catch (Exception e) {
} catch (final Exception e) {
throw new IllegalStateException("xml transformation init failed: " + e.getMessage());
}
initQueue();
}
private void initXmlTransformation(String resultTotalXpath, String resumptionXpath, String entityXpath)
throws TransformerConfigurationException, XPathExpressionException {
private void initXmlTransformation(final String resultTotalXpath, final String resumptionXpath, final String entityXpath)
throws TransformerConfigurationException, XPathExpressionException {
final TransformerFactory factory = TransformerFactory.newInstance();
transformer = factory.newTransformer();
transformer.setOutputProperty(OutputKeys.INDENT, "yes");
transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "3");
xpath = XPathFactory.newInstance().newXPath();
xprResultTotalPath = xpath.compile(resultTotalXpath);
xprResumptionPath = xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath);
xprEntity = xpath.compile(entityXpath);
this.transformer = factory.newTransformer();
this.transformer.setOutputProperty(OutputKeys.INDENT, "yes");
this.transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "3");
this.xpath = XPathFactory.newInstance().newXPath();
this.xprResultTotalPath = this.xpath.compile(resultTotalXpath);
this.xprResumptionPath = this.xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath);
this.xprEntity = this.xpath.compile(entityXpath);
}
private void initQueue() {
query = baseUrl + "?" + queryParams + querySize + queryFormat;
log.info("REST calls starting with {}", query);
this.query = this.baseUrl + "?" + this.queryParams + this.querySize + this.queryFormat;
log.info("REST calls starting with {}", this.query);
}
private void disconnect() {
@ -150,127 +155,140 @@ public class RestIterator implements Iterator<String> {
/*
* (non-Javadoc)
*
* @see java.util.Iterator#hasNext()
*/
@Override
public boolean hasNext() {
if (recordQueue.isEmpty() && query.isEmpty()) {
if (this.recordQueue.isEmpty() && this.query.isEmpty()) {
disconnect();
return false;
} else {
return true;
}
return true;
}
/*
* (non-Javadoc)
*
* @see java.util.Iterator#next()
*/
@Override
public String next() {
synchronized (recordQueue) {
while (recordQueue.isEmpty() && !query.isEmpty()) {
synchronized (this.recordQueue) {
while (this.recordQueue.isEmpty() && !this.query.isEmpty()) {
try {
query = downloadPage(query);
} catch (CollectorException e) {
this.query = downloadPage(this.query, 0);
} catch (final CollectorException e) {
log.debug("CollectorPlugin.next()-Exception: {}", e);
throw new RuntimeException(e);
}
}
return recordQueue.poll();
return this.recordQueue.poll();
}
}
/*
* download page and return nextQuery
* download page and return nextQuery (with number of attempt)
*/
private String downloadPage(String query) throws CollectorException {
String resultJson;
String resultXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
String nextQuery = "";
String emptyXml = resultXml + "<" + JsonUtils.XML_WRAP_TAG + "></" + JsonUtils.XML_WRAP_TAG + ">";
Node resultNode = null;
NodeList nodeList = null;
String qUrlArgument = "";
int urlOldResumptionSize = 0;
InputStream theHttpInputStream;
private String downloadPage(String query, final int attempt) throws CollectorException {
// check if cursor=* is initial set otherwise add it to the queryParam URL
if (resumptionType.equalsIgnoreCase("deep-cursor")) {
log.debug("check resumptionType deep-cursor and check cursor=*?{}", query);
if (!query.contains("&cursor=")) {
query += "&cursor=*";
if (attempt > MAX_ATTEMPTS) { throw new CollectorException("Max Number of attempts reached, query:" + query); }
if (attempt > 0) {
final int delay = (attempt * 5000);
log.debug("Attempt {} with delay {}", attempt, delay);
try {
Thread.sleep(delay);
} catch (final InterruptedException e) {
new CollectorException(e);
}
}
try {
log.info("requestig URL [{}]", query);
String resultJson;
String resultXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
String nextQuery = "";
final String emptyXml = resultXml + "<" + JsonUtils.XML_WRAP_TAG + "></" + JsonUtils.XML_WRAP_TAG + ">";
Node resultNode = null;
NodeList nodeList = null;
String qUrlArgument = "";
int urlOldResumptionSize = 0;
InputStream theHttpInputStream;
URL qUrl = new URL(query);
log.debug("authMethod: {}", authMethod);
if ("bearer".equalsIgnoreCase(this.authMethod)) {
log.trace("authMethod before inputStream: {}", resultXml);
HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + authToken);
conn.setRequestProperty(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType());
conn.setRequestMethod("GET");
theHttpInputStream = conn.getInputStream();
} else if (BASIC.equalsIgnoreCase(this.authMethod)) {
log.trace("authMethod before inputStream: {}", resultXml);
HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Basic " + authToken);
conn.setRequestProperty(HttpHeaders.ACCEPT, ContentType.APPLICATION_XML.getMimeType());
conn.setRequestMethod("GET");
theHttpInputStream = conn.getInputStream();
} else {
theHttpInputStream = qUrl.openStream();
}
resultStream = theHttpInputStream;
if ("json".equals(resultOutputFormat)) {
resultJson = IOUtils.toString(resultStream, StandardCharsets.UTF_8);
resultXml = JsonUtils.convertToXML(resultJson);
resultStream = IOUtils.toInputStream(resultXml, UTF_8);
}
if (!(emptyXml).equalsIgnoreCase(resultXml)) {
resultNode = (Node) xpath.evaluate("/", new InputSource(resultStream), XPathConstants.NODE);
nodeList = (NodeList) xprEntity.evaluate(resultNode, XPathConstants.NODESET);
log.debug("nodeList.length: {}", nodeList.getLength());
for (int i = 0; i < nodeList.getLength(); i++) {
StringWriter sw = new StringWriter();
transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
String toEnqueue = sw.toString();
if (toEnqueue == null || StringUtils.isBlank(toEnqueue) || emptyXml.equalsIgnoreCase(toEnqueue)) {
log.warn("The following record resulted in empty item for the feeding queue: {}", resultXml);
} else {
recordQueue.add(sw.toString());
}
// check if cursor=* is initial set otherwise add it to the queryParam URL
if ("deep-cursor".equalsIgnoreCase(this.resumptionType)) {
log.debug("check resumptionType deep-cursor and check cursor=*?{}", query);
if (!query.contains("&cursor=")) {
query += "&cursor=*";
}
} else {
log.warn("resultXml is equal with emptyXml");
}
resumptionInt += resultSizeValue;
try {
log.info("requesting URL [{}]", query);
switch (resumptionType.toLowerCase()) {
final URL qUrl = new URL(query);
log.debug("authMethod: {}", this.authMethod);
if ("bearer".equalsIgnoreCase(this.authMethod)) {
log.trace("authMethod before inputStream: {}", resultXml);
final HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + this.authToken);
conn.setRequestProperty(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType());
conn.setRequestMethod("GET");
theHttpInputStream = conn.getInputStream();
} else if (this.BASIC.equalsIgnoreCase(this.authMethod)) {
log.trace("authMethod before inputStream: {}", resultXml);
final HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Basic " + this.authToken);
conn.setRequestProperty(HttpHeaders.ACCEPT, ContentType.APPLICATION_XML.getMimeType());
conn.setRequestMethod("GET");
theHttpInputStream = conn.getInputStream();
} else {
theHttpInputStream = qUrl.openStream();
}
this.resultStream = theHttpInputStream;
if ("json".equals(this.resultOutputFormat)) {
resultJson = IOUtils.toString(this.resultStream, StandardCharsets.UTF_8);
resultXml = JsonUtils.convertToXML(resultJson);
this.resultStream = IOUtils.toInputStream(resultXml, UTF_8);
}
if (!(emptyXml).equalsIgnoreCase(resultXml)) {
resultNode = (Node) this.xpath.evaluate("/", new InputSource(this.resultStream), XPathConstants.NODE);
nodeList = (NodeList) this.xprEntity.evaluate(resultNode, XPathConstants.NODESET);
log.debug("nodeList.length: {}", nodeList.getLength());
for (int i = 0; i < nodeList.getLength(); i++) {
final StringWriter sw = new StringWriter();
this.transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
final String toEnqueue = sw.toString();
if ((toEnqueue == null) || StringUtils.isBlank(toEnqueue) || emptyXml.equalsIgnoreCase(toEnqueue)) {
log.warn("The following record resulted in empty item for the feeding queue: {}", resultXml);
} else {
this.recordQueue.add(sw.toString());
}
}
} else {
log.warn("resultXml is equal with emptyXml");
}
this.resumptionInt += this.resultSizeValue;
switch (this.resumptionType.toLowerCase()) {
case "scan": // read of resumptionToken , evaluate next results, e.g. OAI, iterate over items
resumptionStr = xprResumptionPath.evaluate(resultNode);
this.resumptionStr = this.xprResumptionPath.evaluate(resultNode);
break;
case "count": // begin at one step for all records, iterate over items
resumptionStr = Integer.toString(resumptionInt);
this.resumptionStr = Integer.toString(this.resumptionInt);
break;
case "discover": // size of result items unknown, iterate over items (for openDOAR - 201808)
if (resultSizeValue < 2) {
throw new CollectorException("Mode: discover, Param 'resultSizeValue' is less than 2");
}
if (this.resultSizeValue < 2) { throw new CollectorException("Mode: discover, Param 'resultSizeValue' is less than 2"); }
qUrlArgument = qUrl.getQuery();
String[] arrayQUrlArgument = qUrlArgument.split("&");
for (String arrayUrlArgStr : arrayQUrlArgument) {
if (arrayUrlArgStr.startsWith(resumptionParam)) {
String[] resumptionKeyValue = arrayUrlArgStr.split("=");
final String[] arrayQUrlArgument = qUrlArgument.split("&");
for (final String arrayUrlArgStr : arrayQUrlArgument) {
if (arrayUrlArgStr.startsWith(this.resumptionParam)) {
final String[] resumptionKeyValue = arrayUrlArgStr.split("=");
if (isInteger(resumptionKeyValue[1])) {
urlOldResumptionSize = Integer.parseInt(resumptionKeyValue[1]);
log.debug("discover OldResumptionSize from Url (int): {}", urlOldResumptionSize);
@ -281,101 +299,104 @@ public class RestIterator implements Iterator<String> {
}
if (((emptyXml).equalsIgnoreCase(resultXml))
|| ((nodeList != null) && (nodeList.getLength() < resultSizeValue))) {
|| ((nodeList != null) && (nodeList.getLength() < this.resultSizeValue))) {
// resumptionStr = "";
if (nodeList != null) {
discoverResultSize += nodeList.getLength();
this.discoverResultSize += nodeList.getLength();
}
resultTotal = discoverResultSize;
this.resultTotal = this.discoverResultSize;
} else {
resumptionStr = Integer.toString(resumptionInt);
resultTotal = resumptionInt + 1;
this.resumptionStr = Integer.toString(this.resumptionInt);
this.resultTotal = this.resumptionInt + 1;
if (nodeList != null) {
discoverResultSize += nodeList.getLength();
this.discoverResultSize += nodeList.getLength();
}
}
log.info("discoverResultSize: {}", discoverResultSize);
log.info("discoverResultSize: {}", this.discoverResultSize);
break;
case "pagination":
case "page": // pagination, iterate over page numbers
pagination += 1;
this.pagination += 1;
if (nodeList != null) {
discoverResultSize += nodeList.getLength();
this.discoverResultSize += nodeList.getLength();
} else {
resultTotal = discoverResultSize;
pagination = discoverResultSize;
this.resultTotal = this.discoverResultSize;
this.pagination = this.discoverResultSize;
}
resumptionInt = pagination;
resumptionStr = Integer.toString(resumptionInt);
this.resumptionInt = this.pagination;
this.resumptionStr = Integer.toString(this.resumptionInt);
break;
case "deep-cursor": // size of result items unknown, iterate over items (for supporting deep cursor in
// solr)
// solr)
// isn't relevant -- if (resultSizeValue < 2) {throw new CollectorServiceException("Mode:
// deep-cursor, Param 'resultSizeValue' is less than 2");}
resumptionStr = encodeValue(xprResumptionPath.evaluate(resultNode));
queryParams = queryParams.replace("&cursor=*", "");
this.resumptionStr = encodeValue(this.xprResumptionPath.evaluate(resultNode));
this.queryParams = this.queryParams.replace("&cursor=*", "");
// terminating if length of nodeList is 0
if ((nodeList != null) && (nodeList.getLength() < discoverResultSize)) {
resumptionInt += (nodeList.getLength() + 1 - resultSizeValue);
if ((nodeList != null) && (nodeList.getLength() < this.discoverResultSize)) {
this.resumptionInt += ((nodeList.getLength() + 1) - this.resultSizeValue);
} else {
resumptionInt += (nodeList.getLength() - resultSizeValue); // subtract the resultSizeValue
// because the iteration is over
// real length and the
// resultSizeValue is added before
// the switch()
this.resumptionInt += (nodeList.getLength() - this.resultSizeValue); // subtract the resultSizeValue
// because the iteration is over
// real length and the
// resultSizeValue is added before
// the switch()
}
discoverResultSize = nodeList.getLength();
this.discoverResultSize = nodeList.getLength();
log
.debug(
"downloadPage().deep-cursor: resumptionStr=" + resumptionStr + " ; queryParams="
+ queryParams + " resumptionLengthIncreased: " + resumptionInt);
.debug("downloadPage().deep-cursor: resumptionStr=" + this.resumptionStr + " ; queryParams="
+ this.queryParams + " resumptionLengthIncreased: " + this.resumptionInt);
break;
default: // otherwise: abort
// resultTotal = resumptionInt;
break;
}
} catch (final Exception e) {
log.error(e.getMessage(), e);
throw new IllegalStateException("collection failed: " + e.getMessage());
}
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new IllegalStateException("collection failed: " + e.getMessage());
}
try {
if (resultTotal == -1) {
resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode));
if (resumptionType.equalsIgnoreCase("page") && !BASIC.equalsIgnoreCase(authMethod)) {
resultTotal += 1;
} // to correct the upper bound
log.info("resultTotal was -1 is now: " + resultTotal);
try {
if (this.resultTotal == -1) {
this.resultTotal = Integer.parseInt(this.xprResultTotalPath.evaluate(resultNode));
if ("page".equalsIgnoreCase(this.resumptionType) && !this.BASIC.equalsIgnoreCase(this.authMethod)) {
this.resultTotal += 1;
} // to correct the upper bound
log.info("resultTotal was -1 is now: " + this.resultTotal);
}
} catch (final Exception e) {
log.error(e.getMessage(), e);
throw new IllegalStateException("downloadPage resultTotal couldn't parse: " + e.getMessage());
}
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new IllegalStateException("downloadPage resultTotal couldn't parse: " + e.getMessage());
log.debug("resultTotal: " + this.resultTotal);
log.debug("resInt: " + this.resumptionInt);
if (this.resumptionInt <= this.resultTotal) {
nextQuery = this.baseUrl + "?" + this.queryParams + this.querySize + "&" + this.resumptionParam + "=" + this.resumptionStr
+ this.queryFormat;
} else {
nextQuery = "";
// if (resumptionType.toLowerCase().equals("deep-cursor")) { resumptionInt -= 1; } // correct the
// resumptionInt and prevent a NullPointer Exception at mdStore
}
log.debug("nextQueryUrl: " + nextQuery);
return nextQuery;
} catch (final Throwable e) {
log.warn(e.getMessage(), e);
return downloadPage(query, attempt + 1);
}
log.debug("resultTotal: " + resultTotal);
log.debug("resInt: " + resumptionInt);
if (resumptionInt <= resultTotal) {
nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr
+ queryFormat;
} else {
nextQuery = "";
// if (resumptionType.toLowerCase().equals("deep-cursor")) { resumptionInt -= 1; } // correct the
// resumptionInt and prevent a NullPointer Exception at mdStore
}
log.debug("nextQueryUrl: " + nextQuery);
return nextQuery;
}
private boolean isInteger(String s) {
private boolean isInteger(final String s) {
boolean isValidInteger = false;
try {
Integer.parseInt(s);
@ -383,7 +404,7 @@ public class RestIterator implements Iterator<String> {
// s is a valid integer
isValidInteger = true;
} catch (NumberFormatException ex) {
} catch (final NumberFormatException ex) {
// s is not an integer
}
@ -391,20 +412,20 @@ public class RestIterator implements Iterator<String> {
}
// Method to encode a string value using `UTF-8` encoding scheme
private String encodeValue(String value) {
private String encodeValue(final String value) {
try {
return URLEncoder.encode(value, StandardCharsets.UTF_8.toString());
} catch (UnsupportedEncodingException ex) {
} catch (final UnsupportedEncodingException ex) {
throw new RuntimeException(ex.getCause());
}
}
public String getResultFormatValue() {
return resultFormatValue;
return this.resultFormatValue;
}
public String getResultOutputFormat() {
return resultOutputFormat;
return this.resultOutputFormat;
}
}