Merge pull request 'Fixes for RestIterator' (#473) from fix_pagination into main

Reviewed-on: #473
This commit is contained in:
Giambattista Bloisi 2024-09-05 16:59:16 +02:00
commit 10cad80d4d
1 changed files with 58 additions and 31 deletions

View File

@ -12,6 +12,8 @@ import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.xml.transform.OutputKeys; import javax.xml.transform.OutputKeys;
import javax.xml.transform.Transformer; import javax.xml.transform.Transformer;
@ -19,16 +21,10 @@ import javax.xml.transform.TransformerConfigurationException;
import javax.xml.transform.TransformerFactory; import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource; import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult; import javax.xml.transform.stream.StreamResult;
import javax.xml.xpath.XPath; import javax.xml.xpath.*;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpression;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.entity.ContentType;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.w3c.dom.Node; import org.w3c.dom.Node;
@ -51,7 +47,6 @@ import eu.dnetlib.dhp.common.collection.HttpClientParams;
* *
*/ */
public class RestIterator implements Iterator<String> { public class RestIterator implements Iterator<String> {
private static final Logger log = LoggerFactory.getLogger(RestIterator.class); private static final Logger log = LoggerFactory.getLogger(RestIterator.class);
public static final String UTF_8 = "UTF-8"; public static final String UTF_8 = "UTF-8";
private static final int MAX_ATTEMPTS = 5; private static final int MAX_ATTEMPTS = 5;
@ -60,11 +55,15 @@ public class RestIterator implements Iterator<String> {
private final String AUTHBASIC = "basic"; private final String AUTHBASIC = "basic";
private static final String XML_HEADER = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
private static final String EMPTY_XML = XML_HEADER + "<" + JsonUtils.XML_WRAP_TAG + "></" + JsonUtils.XML_WRAP_TAG
+ ">";
private final String baseUrl; private final String baseUrl;
private final String resumptionType; private final String resumptionType;
private final String resumptionParam; private final String resumptionParam;
private final String resultFormatValue; private final String resultFormatValue;
private String queryParams; private String queryParams = "";
private final int resultSizeValue; private final int resultSizeValue;
private int resumptionInt = 0; // integer resumption token (first record to harvest) private int resumptionInt = 0; // integer resumption token (first record to harvest)
private int resultTotal = -1; private int resultTotal = -1;
@ -158,7 +157,12 @@ public class RestIterator implements Iterator<String> {
} }
private void initQueue() { private void initQueue() {
this.query = this.baseUrl + "?" + this.queryParams + this.querySize + this.queryFormat; if (queryParams.equals("") && querySize.equals("") && queryFormat.equals("")) {
query = baseUrl;
} else {
query = baseUrl + "?" + queryParams + querySize + queryFormat;
}
log.info("REST calls starting with {}", this.query); log.info("REST calls starting with {}", this.query);
} }
@ -172,19 +176,6 @@ public class RestIterator implements Iterator<String> {
*/ */
@Override @Override
public boolean hasNext() { public boolean hasNext() {
if (this.recordQueue.isEmpty() && this.query.isEmpty()) {
disconnect();
return false;
}
return true;
}
/*
* (non-Javadoc)
* @see java.util.Iterator#next()
*/
@Override
public String next() {
synchronized (this.recordQueue) { synchronized (this.recordQueue) {
while (this.recordQueue.isEmpty() && !this.query.isEmpty()) { while (this.recordQueue.isEmpty() && !this.query.isEmpty()) {
try { try {
@ -194,6 +185,23 @@ public class RestIterator implements Iterator<String> {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
if (!this.recordQueue.isEmpty()) {
return true;
}
disconnect();
return false;
}
}
/*
* (non-Javadoc)
* @see java.util.Iterator#next()
*/
@Override
public String next() {
synchronized (this.recordQueue) {
return this.recordQueue.poll(); return this.recordQueue.poll();
} }
} }
@ -219,9 +227,8 @@ public class RestIterator implements Iterator<String> {
try { try {
String resultJson; String resultJson;
String resultXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"; String resultXml = XML_HEADER;
String nextQuery = ""; String nextQuery = "";
final String emptyXml = resultXml + "<" + JsonUtils.XML_WRAP_TAG + "></" + JsonUtils.XML_WRAP_TAG + ">";
Node resultNode = null; Node resultNode = null;
NodeList nodeList = null; NodeList nodeList = null;
String qUrlArgument = ""; String qUrlArgument = "";
@ -236,6 +243,22 @@ public class RestIterator implements Iterator<String> {
} }
} }
// find pagination page start number in queryParam and remove before start the first query
if ((resumptionType.toLowerCase().equals("pagination") || resumptionType.toLowerCase().equals("page"))
&& (query.contains("paginationStart="))) {
final Matcher m = Pattern.compile("paginationStart=([0-9]+)").matcher(query);
m.find(); // guaranteed to be true for this regex
String[] pageVal = m.group(0).split("=");
pagination = Integer.parseInt(pageVal[1]);
// remove page start number from query and queryParams
queryParams = queryParams.replaceFirst("&?paginationStart=[0-9]+", "");
query = query.replaceFirst("&?paginationStart=[0-9]+", "");
}
try { try {
log.info("requesting URL [{}]", query); log.info("requesting URL [{}]", query);
@ -261,7 +284,7 @@ public class RestIterator implements Iterator<String> {
this.resultStream = IOUtils.toInputStream(resultXml, UTF_8); this.resultStream = IOUtils.toInputStream(resultXml, UTF_8);
} }
if (!(emptyXml).equalsIgnoreCase(resultXml)) { if (!isEmptyXml(resultXml)) {
resultNode = (Node) this.xpath resultNode = (Node) this.xpath
.evaluate("/", new InputSource(this.resultStream), XPathConstants.NODE); .evaluate("/", new InputSource(this.resultStream), XPathConstants.NODE);
nodeList = (NodeList) this.xprEntity.evaluate(resultNode, XPathConstants.NODESET); nodeList = (NodeList) this.xprEntity.evaluate(resultNode, XPathConstants.NODESET);
@ -270,8 +293,7 @@ public class RestIterator implements Iterator<String> {
final StringWriter sw = new StringWriter(); final StringWriter sw = new StringWriter();
this.transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw)); this.transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
final String toEnqueue = sw.toString(); final String toEnqueue = sw.toString();
if ((toEnqueue == null) || StringUtils.isBlank(toEnqueue) if ((toEnqueue == null) || StringUtils.isBlank(toEnqueue) || isEmptyXml(toEnqueue)) {
|| emptyXml.equalsIgnoreCase(toEnqueue)) {
log log
.warn( .warn(
"The following record resulted in empty item for the feeding queue: {}", resultXml); "The following record resulted in empty item for the feeding queue: {}", resultXml);
@ -299,6 +321,7 @@ public class RestIterator implements Iterator<String> {
throw new CollectorException("Mode: discover, Param 'resultSizeValue' is less than 2"); throw new CollectorException("Mode: discover, Param 'resultSizeValue' is less than 2");
} }
qUrlArgument = qUrl.getQuery(); qUrlArgument = qUrl.getQuery();
final String[] arrayQUrlArgument = qUrlArgument.split("&"); final String[] arrayQUrlArgument = qUrlArgument.split("&");
for (final String arrayUrlArgStr : arrayQUrlArgument) { for (final String arrayUrlArgStr : arrayQUrlArgument) {
if (arrayUrlArgStr.startsWith(this.resumptionParam)) { if (arrayUrlArgStr.startsWith(this.resumptionParam)) {
@ -312,7 +335,7 @@ public class RestIterator implements Iterator<String> {
} }
} }
if (((emptyXml).equalsIgnoreCase(resultXml)) if (isEmptyXml(resultXml)
|| ((nodeList != null) && (nodeList.getLength() < this.resultSizeValue))) { || ((nodeList != null) && (nodeList.getLength() < this.resultSizeValue))) {
// resumptionStr = ""; // resumptionStr = "";
if (nodeList != null) { if (nodeList != null) {
@ -331,13 +354,13 @@ public class RestIterator implements Iterator<String> {
case "pagination": case "pagination":
case "page": // pagination, iterate over page numbers case "page": // pagination, iterate over page numbers
this.pagination += 1; if (nodeList != null && nodeList.getLength() > 0) {
if (nodeList != null) {
this.discoverResultSize += nodeList.getLength(); this.discoverResultSize += nodeList.getLength();
} else { } else {
this.resultTotal = this.discoverResultSize; this.resultTotal = this.discoverResultSize;
this.pagination = this.discoverResultSize; this.pagination = this.discoverResultSize;
} }
this.pagination += 1;
this.resumptionInt = this.pagination; this.resumptionInt = this.pagination;
this.resumptionStr = Integer.toString(this.resumptionInt); this.resumptionStr = Integer.toString(this.resumptionInt);
break; break;
@ -415,6 +438,10 @@ public class RestIterator implements Iterator<String> {
} }
private boolean isEmptyXml(String s) {
return EMPTY_XML.equalsIgnoreCase(s);
}
private boolean isInteger(final String s) { private boolean isInteger(final String s) {
boolean isValidInteger = false; boolean isValidInteger = false;
try { try {