From 61a2551e74d18f35cfbfe3043e51899d478e3285 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 15 Mar 2021 17:17:55 +0100 Subject: [PATCH] migrated last changes from svn (dnet45) --- .../plugin/rest/RestCollectorPlugin.java | 19 +- .../collection/plugin/rest/RestIterator.java | 166 +++++++----------- 2 files changed, 83 insertions(+), 102 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestCollectorPlugin.java index ad8bfa4ea8..e59db143a5 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestCollectorPlugin.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.collection.plugin.rest; +import java.util.Optional; import java.util.Spliterator; import java.util.Spliterators; import java.util.stream.Stream; @@ -23,6 +24,8 @@ import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; */ public class RestCollectorPlugin implements CollectorPlugin { + public static final String RESULT_SIZE_VALUE_DEFAULT = "100"; + private HttpClientParams clientParams; public RestCollectorPlugin(HttpClientParams clientParams) { @@ -32,6 +35,7 @@ public class RestCollectorPlugin implements CollectorPlugin { @Override public Stream collect(final ApiDescriptor api, final AggregatorReport report) throws CollectorException { final String baseUrl = api.getBaseUrl(); + final String resumptionType = api.getParams().get("resumptionType"); final String resumptionParam = api.getParams().get("resumptionParam"); final String resumptionXpath = api.getParams().get("resumptionXpath"); @@ -39,12 +43,14 @@ public class RestCollectorPlugin implements CollectorPlugin { final String resultFormatParam = api.getParams().get("resultFormatParam"); final String resultFormatValue = api.getParams().get("resultFormatValue"); final String resultSizeParam = api.getParams().get("resultSizeParam"); - final String resultSizeValue = (StringUtils.isBlank(api.getParams().get("resultSizeValue"))) ? "100" - : api.getParams().get("resultSizeValue"); final String queryParams = api.getParams().get("queryParams"); final String entityXpath = api.getParams().get("entityXpath"); final String authMethod = api.getParams().get("authMethod"); final String authToken = api.getParams().get("authToken"); + final String resultSizeValue = Optional + .ofNullable(api.getParams().get("resultSizeValue")) + .filter(StringUtils::isNotBlank) + .orElse(RESULT_SIZE_VALUE_DEFAULT); if (StringUtils.isBlank(baseUrl)) { throw new CollectorException("Param 'baseUrl' is null or empty"); @@ -65,6 +71,12 @@ public class RestCollectorPlugin implements CollectorPlugin { throw new CollectorException("Param 'entityXpath' is null or empty"); } + final String resultOutputFormat = Optional + .ofNullable(api.getParams().get("resultOutputFormat")) + .map(String::toLowerCase) + .filter(StringUtils::isNotBlank) + .orElse(resultFormatValue.toLowerCase()); + RestIterator it = new RestIterator( getClientParams(), baseUrl, @@ -79,7 +91,8 @@ public class RestCollectorPlugin implements CollectorPlugin { queryParams, entityXpath, authMethod, - authToken); + authToken, + resultOutputFormat); return StreamSupport .stream( diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestIterator.java index b728293d54..fdefa67b8a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/rest/RestIterator.java @@ -1,6 +1,27 @@ package eu.dnetlib.dhp.collection.plugin.rest; +import eu.dnetlib.dhp.collection.CollectorException; +import eu.dnetlib.dhp.collection.HttpClientParams; +import eu.dnetlib.dhp.collection.JsonUtils; +import org.apache.avro.test.http.Http; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpHeaders; +import org.apache.http.entity.ContentType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.InputSource; + +import javax.xml.transform.OutputKeys; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerConfigurationException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import javax.xml.xpath.*; import java.io.InputStream; import java.io.StringWriter; import java.io.UnsupportedEncodingException; @@ -12,30 +33,8 @@ import java.util.Iterator; import java.util.Queue; import java.util.concurrent.PriorityBlockingQueue; -import javax.xml.transform.OutputKeys; -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerConfigurationException; -import javax.xml.transform.TransformerFactory; -import javax.xml.transform.dom.DOMSource; -import javax.xml.transform.stream.StreamResult; -import javax.xml.xpath.*; - -import org.apache.commons.httpclient.HttpMethod; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.http.HttpHeaders; -import org.w3c.dom.Node; -import org.w3c.dom.NodeList; -import org.xml.sax.InputSource; - -import eu.dnetlib.dhp.collection.CollectorException; -import eu.dnetlib.dhp.collection.HttpClientParams; -import eu.dnetlib.dhp.collection.JsonUtils; - /** - * log.debug(...) equal to log.trace(...) in the application-logs + * log.info(...) equal to log.trace(...) in the application-logs *

* known bug: at resumptionType 'discover' if the (resultTotal % resultSizeValue) == 0 the collecting fails -> change the resultSizeValue * @@ -45,7 +44,8 @@ import eu.dnetlib.dhp.collection.JsonUtils; */ public class RestIterator implements Iterator { - private static final Log log = LogFactory.getLog(RestIterator.class); + private static final Logger log = LoggerFactory.getLogger(RestIterator.class); + public static final String UTF_8 = "UTF-8"; private HttpClientParams clientParams; @@ -74,65 +74,15 @@ public class RestIterator implements Iterator { private String querySize; private String authMethod; private String authToken; - private final Queue recordQueue = new PriorityBlockingQueue(); + private Queue recordQueue = new PriorityBlockingQueue(); private int discoverResultSize = 0; private int pagination = 1; - - /** - * RestIterator class - * - * compatible to version before 1.3.33 - * - * @param baseUrl - * @param resumptionType - * @param resumptionParam - * @param resumptionXpath - * @param resultTotalXpath - * @param resultFormatParam - * @param resultFormatValue - * @param resultSizeParam - * @param resultSizeValueStr - * @param queryParams - * @param entityXpath + /* + * While resultFormatValue is added to the request parameter, this is used to say that the results are retrieved in + * json. useful for cases when the target API expects a resultFormatValue != json, but the results are returned in + * json. An example is the EU Open Data Portal API: resultFormatValue=standard, results are in json format. */ - public RestIterator( - final HttpClientParams clientParams, - final String baseUrl, - final String resumptionType, - final String resumptionParam, - final String resumptionXpath, - final String resultTotalXpath, - final String resultFormatParam, - final String resultFormatValue, - final String resultSizeParam, - final String resultSizeValueStr, - final String queryParams, - final String entityXpath) { - this(clientParams, baseUrl, resumptionType, resumptionParam, resumptionXpath, resultTotalXpath, - resultFormatParam, resultFormatValue, resultSizeParam, resultSizeValueStr, queryParams, entityXpath, "", - ""); - } - - public RestIterator( - final HttpClientParams clientParams, - final String baseUrl, - final String resumptionType, - final String resumptionParam, - final String resumptionXpath, - final String resultTotalXpath, - final String resultFormatParam, - final String resultFormatValue, - final String resultSizeParam, - final String resultSizeValueStr, - final String queryParams, - final String entityXpath, - final String authMethod, - final String authToken, - final String resultOffsetParam) { - this(clientParams, baseUrl, resumptionType, resumptionParam, resumptionXpath, resultTotalXpath, - resultFormatParam, resultFormatValue, resultSizeParam, resultSizeValueStr, queryParams, entityXpath, "", - ""); - } + private String resultOutputFormat; /** RestIterator class * compatible to version 1.3.33 @@ -151,17 +101,20 @@ public class RestIterator implements Iterator { final String queryParams, final String entityXpath, final String authMethod, - final String authToken) { + final String authToken, + final String resultOutputFormat) { + this.clientParams = clientParams; this.jsonUtils = new JsonUtils(); this.baseUrl = baseUrl; this.resumptionType = resumptionType; this.resumptionParam = resumptionParam; this.resultFormatValue = resultFormatValue; - this.queryParams = queryParams; this.resultSizeValue = Integer.valueOf(resultSizeValueStr); + this.queryParams = queryParams; this.authMethod = authMethod; this.authToken = authToken; + this.resultOutputFormat = resultOutputFormat; queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue : ""; @@ -188,6 +141,7 @@ public class RestIterator implements Iterator { private void initQueue() { query = baseUrl + "?" + queryParams + querySize + queryFormat; + log.info("REST calls starting with " + query); } private void disconnect() { @@ -217,9 +171,7 @@ public class RestIterator implements Iterator { synchronized (recordQueue) { while (recordQueue.isEmpty() && !query.isEmpty()) { try { - log.debug("get Query: " + query); query = downloadPage(query); - log.debug("next queryURL from downloadPage(): " + query); } catch (CollectorException e) { log.debug("CollectorPlugin.next()-Exception: " + e); throw new RuntimeException(e); @@ -235,9 +187,12 @@ public class RestIterator implements Iterator { private String downloadPage(String query) throws CollectorException { String resultJson; String resultXml = ""; + String nextQuery = ""; String emptyXml = resultXml + "<" + JsonUtils.wrapName + ">"; Node resultNode = null; NodeList nodeList = null; + String qUrlArgument = ""; + int urlOldResumptionSize = 0; InputStream theHttpInputStream; // check if cursor=* is initial set otherwise add it to the queryParam URL @@ -249,20 +204,22 @@ public class RestIterator implements Iterator { } try { + log.info("requestig URL [{}]", query); + URL qUrl = new URL(query); log.debug("authMethod :" + authMethod); - if (this.authMethod == "bearer") { + if ("bearer".equalsIgnoreCase(this.authMethod)) { log.trace("authMethod before inputStream: " + resultXml); HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection(); conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + authToken); - conn.setRequestProperty(HttpHeaders.CONTENT_TYPE, "application/json"); + conn.setRequestProperty(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); conn.setRequestMethod("GET"); theHttpInputStream = conn.getInputStream(); } else if (BASIC.equalsIgnoreCase(this.authMethod)) { log.trace("authMethod before inputStream: " + resultXml); HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection(); conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Basic " + authToken); - conn.setRequestProperty(HttpHeaders.ACCEPT, "application/xml"); + conn.setRequestProperty(HttpHeaders.ACCEPT, ContentType.APPLICATION_XML.getMimeType()); conn.setRequestMethod("GET"); theHttpInputStream = conn.getInputStream(); } else { @@ -270,10 +227,10 @@ public class RestIterator implements Iterator { } resultStream = theHttpInputStream; - if ("json".equalsIgnoreCase(resultFormatValue)) { - resultJson = IOUtils.toString(resultStream, "UTF-8"); + if ("json".equals(resultOutputFormat)) { + resultJson = IOUtils.toString(resultStream, UTF_8); resultXml = jsonUtils.convertToXML(resultJson); - resultStream = IOUtils.toInputStream(resultXml, "UTF-8"); + resultStream = IOUtils.toInputStream(resultXml, UTF_8); } if (!(emptyXml).equalsIgnoreCase(resultXml)) { @@ -283,15 +240,19 @@ public class RestIterator implements Iterator { for (int i = 0; i < nodeList.getLength(); i++) { StringWriter sw = new StringWriter(); transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw)); - recordQueue.add(sw.toString()); + String toEnqueue = sw.toString(); + if (toEnqueue == null || StringUtils.isBlank(toEnqueue) || emptyXml.equalsIgnoreCase(toEnqueue)) { + log.warn("The following record resulted in empty item for the feeding queue: " + resultXml); + } else { + recordQueue.add(sw.toString()); + } } } else { - log.info("resultXml is equal with emptyXml"); + log.warn("resultXml is equal with emptyXml"); } resumptionInt += resultSizeValue; - String qUrlArgument = ""; switch (resumptionType.toLowerCase()) { case "scan": // read of resumptionToken , evaluate next results, e.g. OAI, iterate over items resumptionStr = xprResumptionPath.evaluate(resultNode); @@ -307,7 +268,6 @@ public class RestIterator implements Iterator { } qUrlArgument = qUrl.getQuery(); String[] arrayQUrlArgument = qUrlArgument.split("&"); - int urlOldResumptionSize = 0; for (String arrayUrlArgStr : arrayQUrlArgument) { if (arrayUrlArgStr.startsWith(resumptionParam)) { String[] resumptionKeyValue = arrayUrlArgStr.split("="); @@ -334,7 +294,7 @@ public class RestIterator implements Iterator { discoverResultSize += nodeList.getLength(); } } - log.debug("discoverResultSize: " + discoverResultSize); + log.info("discoverResultSize: {}", discoverResultSize); break; case "pagination": @@ -384,25 +344,24 @@ public class RestIterator implements Iterator { } } catch (Exception e) { - log.error(e); + log.error(e.getMessage(), e); throw new IllegalStateException("collection failed: " + e.getMessage()); } try { if (resultTotal == -1) { resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode)); - if (resumptionType.toLowerCase().equals("page") && !BASIC.equalsIgnoreCase(authMethod)) { + if (resumptionType.equalsIgnoreCase("page") && !BASIC.equalsIgnoreCase(authMethod)) { resultTotal += 1; } // to correct the upper bound log.info("resultTotal was -1 is now: " + resultTotal); } } catch (Exception e) { - log.error(e); + log.error(e.getMessage(), e); throw new IllegalStateException("downloadPage resultTotal couldn't parse: " + e.getMessage()); } log.debug("resultTotal: " + resultTotal); log.debug("resInt: " + resumptionInt); - String nextQuery; if (resumptionInt <= resultTotal) { nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat; @@ -413,6 +372,7 @@ public class RestIterator implements Iterator { } log.debug("nextQueryUrl: " + nextQuery); return nextQuery; + } private boolean isInteger(String s) { @@ -439,4 +399,12 @@ public class RestIterator implements Iterator { } } + public String getResultFormatValue() { + return resultFormatValue; + } + + public String getResultOutputFormat() { + return resultOutputFormat; + } + }