master #59

Closed
claudio.atzori wants to merge 3221 commits from master into stable_ids
2 changed files with 83 additions and 102 deletions
Showing only changes of commit 61a2551e74 - Show all commits

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.collection.plugin.rest; package eu.dnetlib.dhp.collection.plugin.rest;
import java.util.Optional;
import java.util.Spliterator; import java.util.Spliterator;
import java.util.Spliterators; import java.util.Spliterators;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -23,6 +24,8 @@ import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
*/ */
public class RestCollectorPlugin implements CollectorPlugin { public class RestCollectorPlugin implements CollectorPlugin {
public static final String RESULT_SIZE_VALUE_DEFAULT = "100";
private HttpClientParams clientParams; private HttpClientParams clientParams;
public RestCollectorPlugin(HttpClientParams clientParams) { public RestCollectorPlugin(HttpClientParams clientParams) {
@ -32,6 +35,7 @@ public class RestCollectorPlugin implements CollectorPlugin {
@Override @Override
public Stream<String> collect(final ApiDescriptor api, final AggregatorReport report) throws CollectorException { public Stream<String> collect(final ApiDescriptor api, final AggregatorReport report) throws CollectorException {
final String baseUrl = api.getBaseUrl(); final String baseUrl = api.getBaseUrl();
final String resumptionType = api.getParams().get("resumptionType"); final String resumptionType = api.getParams().get("resumptionType");
final String resumptionParam = api.getParams().get("resumptionParam"); final String resumptionParam = api.getParams().get("resumptionParam");
final String resumptionXpath = api.getParams().get("resumptionXpath"); final String resumptionXpath = api.getParams().get("resumptionXpath");
@ -39,12 +43,14 @@ public class RestCollectorPlugin implements CollectorPlugin {
final String resultFormatParam = api.getParams().get("resultFormatParam"); final String resultFormatParam = api.getParams().get("resultFormatParam");
final String resultFormatValue = api.getParams().get("resultFormatValue"); final String resultFormatValue = api.getParams().get("resultFormatValue");
final String resultSizeParam = api.getParams().get("resultSizeParam"); final String resultSizeParam = api.getParams().get("resultSizeParam");
final String resultSizeValue = (StringUtils.isBlank(api.getParams().get("resultSizeValue"))) ? "100"
: api.getParams().get("resultSizeValue");
final String queryParams = api.getParams().get("queryParams"); final String queryParams = api.getParams().get("queryParams");
final String entityXpath = api.getParams().get("entityXpath"); final String entityXpath = api.getParams().get("entityXpath");
final String authMethod = api.getParams().get("authMethod"); final String authMethod = api.getParams().get("authMethod");
final String authToken = api.getParams().get("authToken"); final String authToken = api.getParams().get("authToken");
final String resultSizeValue = Optional
.ofNullable(api.getParams().get("resultSizeValue"))
.filter(StringUtils::isNotBlank)
.orElse(RESULT_SIZE_VALUE_DEFAULT);
if (StringUtils.isBlank(baseUrl)) { if (StringUtils.isBlank(baseUrl)) {
throw new CollectorException("Param 'baseUrl' is null or empty"); throw new CollectorException("Param 'baseUrl' is null or empty");
@ -65,6 +71,12 @@ public class RestCollectorPlugin implements CollectorPlugin {
throw new CollectorException("Param 'entityXpath' is null or empty"); throw new CollectorException("Param 'entityXpath' is null or empty");
} }
final String resultOutputFormat = Optional
.ofNullable(api.getParams().get("resultOutputFormat"))
.map(String::toLowerCase)
.filter(StringUtils::isNotBlank)
.orElse(resultFormatValue.toLowerCase());
RestIterator it = new RestIterator( RestIterator it = new RestIterator(
getClientParams(), getClientParams(),
baseUrl, baseUrl,
@ -79,7 +91,8 @@ public class RestCollectorPlugin implements CollectorPlugin {
queryParams, queryParams,
entityXpath, entityXpath,
authMethod, authMethod,
authToken); authToken,
resultOutputFormat);
return StreamSupport return StreamSupport
.stream( .stream(

View File

@ -1,6 +1,27 @@
package eu.dnetlib.dhp.collection.plugin.rest; package eu.dnetlib.dhp.collection.plugin.rest;
import eu.dnetlib.dhp.collection.CollectorException;
import eu.dnetlib.dhp.collection.HttpClientParams;
import eu.dnetlib.dhp.collection.JsonUtils;
import org.apache.avro.test.http.Http;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.entity.ContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import javax.xml.transform.OutputKeys;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerConfigurationException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import javax.xml.xpath.*;
import java.io.InputStream; import java.io.InputStream;
import java.io.StringWriter; import java.io.StringWriter;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
@ -12,30 +33,8 @@ import java.util.Iterator;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import javax.xml.transform.OutputKeys;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerConfigurationException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import javax.xml.xpath.*;
import org.apache.commons.httpclient.HttpMethod;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpHeaders;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import eu.dnetlib.dhp.collection.CollectorException;
import eu.dnetlib.dhp.collection.HttpClientParams;
import eu.dnetlib.dhp.collection.JsonUtils;
/** /**
* log.debug(...) equal to log.trace(...) in the application-logs * log.info(...) equal to log.trace(...) in the application-logs
* <p> * <p>
* known bug: at resumptionType 'discover' if the (resultTotal % resultSizeValue) == 0 the collecting fails -> change the resultSizeValue * known bug: at resumptionType 'discover' if the (resultTotal % resultSizeValue) == 0 the collecting fails -> change the resultSizeValue
* *
@ -45,7 +44,8 @@ import eu.dnetlib.dhp.collection.JsonUtils;
*/ */
public class RestIterator implements Iterator<String> { public class RestIterator implements Iterator<String> {
private static final Log log = LogFactory.getLog(RestIterator.class); private static final Logger log = LoggerFactory.getLogger(RestIterator.class);
public static final String UTF_8 = "UTF-8";
private HttpClientParams clientParams; private HttpClientParams clientParams;
@ -74,65 +74,15 @@ public class RestIterator implements Iterator<String> {
private String querySize; private String querySize;
private String authMethod; private String authMethod;
private String authToken; private String authToken;
private final Queue<String> recordQueue = new PriorityBlockingQueue<String>(); private Queue<String> recordQueue = new PriorityBlockingQueue<String>();
private int discoverResultSize = 0; private int discoverResultSize = 0;
private int pagination = 1; private int pagination = 1;
/*
/** * While resultFormatValue is added to the request parameter, this is used to say that the results are retrieved in
* RestIterator class * json. useful for cases when the target API expects a resultFormatValue != json, but the results are returned in
* * json. An example is the EU Open Data Portal API: resultFormatValue=standard, results are in json format.
* compatible to version before 1.3.33
*
* @param baseUrl
* @param resumptionType
* @param resumptionParam
* @param resumptionXpath
* @param resultTotalXpath
* @param resultFormatParam
* @param resultFormatValue
* @param resultSizeParam
* @param resultSizeValueStr
* @param queryParams
* @param entityXpath
*/ */
public RestIterator( private String resultOutputFormat;
final HttpClientParams clientParams,
final String baseUrl,
final String resumptionType,
final String resumptionParam,
final String resumptionXpath,
final String resultTotalXpath,
final String resultFormatParam,
final String resultFormatValue,
final String resultSizeParam,
final String resultSizeValueStr,
final String queryParams,
final String entityXpath) {
this(clientParams, baseUrl, resumptionType, resumptionParam, resumptionXpath, resultTotalXpath,
resultFormatParam, resultFormatValue, resultSizeParam, resultSizeValueStr, queryParams, entityXpath, "",
"");
}
public RestIterator(
final HttpClientParams clientParams,
final String baseUrl,
final String resumptionType,
final String resumptionParam,
final String resumptionXpath,
final String resultTotalXpath,
final String resultFormatParam,
final String resultFormatValue,
final String resultSizeParam,
final String resultSizeValueStr,
final String queryParams,
final String entityXpath,
final String authMethod,
final String authToken,
final String resultOffsetParam) {
this(clientParams, baseUrl, resumptionType, resumptionParam, resumptionXpath, resultTotalXpath,
resultFormatParam, resultFormatValue, resultSizeParam, resultSizeValueStr, queryParams, entityXpath, "",
"");
}
/** RestIterator class /** RestIterator class
* compatible to version 1.3.33 * compatible to version 1.3.33
@ -151,17 +101,20 @@ public class RestIterator implements Iterator<String> {
final String queryParams, final String queryParams,
final String entityXpath, final String entityXpath,
final String authMethod, final String authMethod,
final String authToken) { final String authToken,
final String resultOutputFormat) {
this.clientParams = clientParams; this.clientParams = clientParams;
this.jsonUtils = new JsonUtils(); this.jsonUtils = new JsonUtils();
this.baseUrl = baseUrl; this.baseUrl = baseUrl;
this.resumptionType = resumptionType; this.resumptionType = resumptionType;
this.resumptionParam = resumptionParam; this.resumptionParam = resumptionParam;
this.resultFormatValue = resultFormatValue; this.resultFormatValue = resultFormatValue;
this.queryParams = queryParams;
this.resultSizeValue = Integer.valueOf(resultSizeValueStr); this.resultSizeValue = Integer.valueOf(resultSizeValueStr);
this.queryParams = queryParams;
this.authMethod = authMethod; this.authMethod = authMethod;
this.authToken = authToken; this.authToken = authToken;
this.resultOutputFormat = resultOutputFormat;
queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue
: ""; : "";
@ -188,6 +141,7 @@ public class RestIterator implements Iterator<String> {
private void initQueue() { private void initQueue() {
query = baseUrl + "?" + queryParams + querySize + queryFormat; query = baseUrl + "?" + queryParams + querySize + queryFormat;
log.info("REST calls starting with " + query);
} }
private void disconnect() { private void disconnect() {
@ -217,9 +171,7 @@ public class RestIterator implements Iterator<String> {
synchronized (recordQueue) { synchronized (recordQueue) {
while (recordQueue.isEmpty() && !query.isEmpty()) { while (recordQueue.isEmpty() && !query.isEmpty()) {
try { try {
log.debug("get Query: " + query);
query = downloadPage(query); query = downloadPage(query);
log.debug("next queryURL from downloadPage(): " + query);
} catch (CollectorException e) { } catch (CollectorException e) {
log.debug("CollectorPlugin.next()-Exception: " + e); log.debug("CollectorPlugin.next()-Exception: " + e);
throw new RuntimeException(e); throw new RuntimeException(e);
@ -235,9 +187,12 @@ public class RestIterator implements Iterator<String> {
private String downloadPage(String query) throws CollectorException { private String downloadPage(String query) throws CollectorException {
String resultJson; String resultJson;
String resultXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"; String resultXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
String nextQuery = "";
String emptyXml = resultXml + "<" + JsonUtils.wrapName + "></" + JsonUtils.wrapName + ">"; String emptyXml = resultXml + "<" + JsonUtils.wrapName + "></" + JsonUtils.wrapName + ">";
Node resultNode = null; Node resultNode = null;
NodeList nodeList = null; NodeList nodeList = null;
String qUrlArgument = "";
int urlOldResumptionSize = 0;
InputStream theHttpInputStream; InputStream theHttpInputStream;
// check if cursor=* is initial set otherwise add it to the queryParam URL // check if cursor=* is initial set otherwise add it to the queryParam URL
@ -249,20 +204,22 @@ public class RestIterator implements Iterator<String> {
} }
try { try {
log.info("requestig URL [{}]", query);
URL qUrl = new URL(query); URL qUrl = new URL(query);
log.debug("authMethod :" + authMethod); log.debug("authMethod :" + authMethod);
if (this.authMethod == "bearer") { if ("bearer".equalsIgnoreCase(this.authMethod)) {
log.trace("authMethod before inputStream: " + resultXml); log.trace("authMethod before inputStream: " + resultXml);
HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection(); HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + authToken); conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + authToken);
conn.setRequestProperty(HttpHeaders.CONTENT_TYPE, "application/json"); conn.setRequestProperty(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType());
conn.setRequestMethod("GET"); conn.setRequestMethod("GET");
theHttpInputStream = conn.getInputStream(); theHttpInputStream = conn.getInputStream();
} else if (BASIC.equalsIgnoreCase(this.authMethod)) { } else if (BASIC.equalsIgnoreCase(this.authMethod)) {
log.trace("authMethod before inputStream: " + resultXml); log.trace("authMethod before inputStream: " + resultXml);
HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection(); HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Basic " + authToken); conn.setRequestProperty(HttpHeaders.AUTHORIZATION, "Basic " + authToken);
conn.setRequestProperty(HttpHeaders.ACCEPT, "application/xml"); conn.setRequestProperty(HttpHeaders.ACCEPT, ContentType.APPLICATION_XML.getMimeType());
conn.setRequestMethod("GET"); conn.setRequestMethod("GET");
theHttpInputStream = conn.getInputStream(); theHttpInputStream = conn.getInputStream();
} else { } else {
@ -270,10 +227,10 @@ public class RestIterator implements Iterator<String> {
} }
resultStream = theHttpInputStream; resultStream = theHttpInputStream;
if ("json".equalsIgnoreCase(resultFormatValue)) { if ("json".equals(resultOutputFormat)) {
resultJson = IOUtils.toString(resultStream, "UTF-8"); resultJson = IOUtils.toString(resultStream, UTF_8);
resultXml = jsonUtils.convertToXML(resultJson); resultXml = jsonUtils.convertToXML(resultJson);
resultStream = IOUtils.toInputStream(resultXml, "UTF-8"); resultStream = IOUtils.toInputStream(resultXml, UTF_8);
} }
if (!(emptyXml).equalsIgnoreCase(resultXml)) { if (!(emptyXml).equalsIgnoreCase(resultXml)) {
@ -283,15 +240,19 @@ public class RestIterator implements Iterator<String> {
for (int i = 0; i < nodeList.getLength(); i++) { for (int i = 0; i < nodeList.getLength(); i++) {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw)); transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
String toEnqueue = sw.toString();
if (toEnqueue == null || StringUtils.isBlank(toEnqueue) || emptyXml.equalsIgnoreCase(toEnqueue)) {
log.warn("The following record resulted in empty item for the feeding queue: " + resultXml);
} else {
recordQueue.add(sw.toString()); recordQueue.add(sw.toString());
} }
}
} else { } else {
log.info("resultXml is equal with emptyXml"); log.warn("resultXml is equal with emptyXml");
} }
resumptionInt += resultSizeValue; resumptionInt += resultSizeValue;
String qUrlArgument = "";
switch (resumptionType.toLowerCase()) { switch (resumptionType.toLowerCase()) {
case "scan": // read of resumptionToken , evaluate next results, e.g. OAI, iterate over items case "scan": // read of resumptionToken , evaluate next results, e.g. OAI, iterate over items
resumptionStr = xprResumptionPath.evaluate(resultNode); resumptionStr = xprResumptionPath.evaluate(resultNode);
@ -307,7 +268,6 @@ public class RestIterator implements Iterator<String> {
} }
qUrlArgument = qUrl.getQuery(); qUrlArgument = qUrl.getQuery();
String[] arrayQUrlArgument = qUrlArgument.split("&"); String[] arrayQUrlArgument = qUrlArgument.split("&");
int urlOldResumptionSize = 0;
for (String arrayUrlArgStr : arrayQUrlArgument) { for (String arrayUrlArgStr : arrayQUrlArgument) {
if (arrayUrlArgStr.startsWith(resumptionParam)) { if (arrayUrlArgStr.startsWith(resumptionParam)) {
String[] resumptionKeyValue = arrayUrlArgStr.split("="); String[] resumptionKeyValue = arrayUrlArgStr.split("=");
@ -334,7 +294,7 @@ public class RestIterator implements Iterator<String> {
discoverResultSize += nodeList.getLength(); discoverResultSize += nodeList.getLength();
} }
} }
log.debug("discoverResultSize: " + discoverResultSize); log.info("discoverResultSize: {}", discoverResultSize);
break; break;
case "pagination": case "pagination":
@ -384,25 +344,24 @@ public class RestIterator implements Iterator<String> {
} }
} catch (Exception e) { } catch (Exception e) {
log.error(e); log.error(e.getMessage(), e);
throw new IllegalStateException("collection failed: " + e.getMessage()); throw new IllegalStateException("collection failed: " + e.getMessage());
} }
try { try {
if (resultTotal == -1) { if (resultTotal == -1) {
resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode)); resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode));
if (resumptionType.toLowerCase().equals("page") && !BASIC.equalsIgnoreCase(authMethod)) { if (resumptionType.equalsIgnoreCase("page") && !BASIC.equalsIgnoreCase(authMethod)) {
resultTotal += 1; resultTotal += 1;
} // to correct the upper bound } // to correct the upper bound
log.info("resultTotal was -1 is now: " + resultTotal); log.info("resultTotal was -1 is now: " + resultTotal);
} }
} catch (Exception e) { } catch (Exception e) {
log.error(e); log.error(e.getMessage(), e);
throw new IllegalStateException("downloadPage resultTotal couldn't parse: " + e.getMessage()); throw new IllegalStateException("downloadPage resultTotal couldn't parse: " + e.getMessage());
} }
log.debug("resultTotal: " + resultTotal); log.debug("resultTotal: " + resultTotal);
log.debug("resInt: " + resumptionInt); log.debug("resInt: " + resumptionInt);
String nextQuery;
if (resumptionInt <= resultTotal) { if (resumptionInt <= resultTotal) {
nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr
+ queryFormat; + queryFormat;
@ -413,6 +372,7 @@ public class RestIterator implements Iterator<String> {
} }
log.debug("nextQueryUrl: " + nextQuery); log.debug("nextQueryUrl: " + nextQuery);
return nextQuery; return nextQuery;
} }
private boolean isInteger(String s) { private boolean isInteger(String s) {
@ -439,4 +399,12 @@ public class RestIterator implements Iterator<String> {
} }
} }
public String getResultFormatValue() {
return resultFormatValue;
}
public String getResultOutputFormat() {
return resultOutputFormat;
}
} }