From 0e8a4f9f1acbb03d1ec8c5cefcc6caff053cb532 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 3 Feb 2021 12:33:41 +0100 Subject: [PATCH] better logging, WIP: collectorWorker error reporting --- .../dhp/application/ApplicationUtils.java | 21 +++++ .../mdstore/MDStoreActionNode.java | 32 +++---- .../collection/plugin/CollectorPlugin.java | 3 + .../plugin/oai/OaiCollectorPlugin.java | 21 ++++- .../collection/plugin/oai/OaiIterator.java | 17 +++- .../plugin/oai/OaiIteratorFactory.java | 6 +- .../collection/worker/CollectorWorker.java | 87 +++++++++---------- .../worker/CollectorWorkerApplication.java | 20 +++-- .../worker/utils/CollectorPluginFactory.java | 2 +- .../worker/utils/HttpConnector.java | 84 +++++++----------- .../DnetCollectorWorkerApplicationTests.java | 2 +- 11 files changed, 159 insertions(+), 136 deletions(-) create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/application/ApplicationUtils.java diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/application/ApplicationUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ApplicationUtils.java new file mode 100644 index 000000000..531c13af3 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ApplicationUtils.java @@ -0,0 +1,21 @@ + +package eu.dnetlib.dhp.application; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.util.Properties; + +public class ApplicationUtils { + + public static void populateOOZIEEnv(final String paramName, String value) throws Exception { + File file = new File(System.getProperty("oozie.action.output.properties")); + Properties props = new Properties(); + + props.setProperty(paramName, value); + OutputStream os = new FileOutputStream(file); + props.store(os, ""); + os.close(); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/mdstore/MDStoreActionNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/mdstore/MDStoreActionNode.java index 6cb0537b2..3e471cfc8 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/mdstore/MDStoreActionNode.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/mdstore/MDStoreActionNode.java @@ -1,6 +1,9 @@ package eu.dnetlib.dhp.aggregation.mdstore; +import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; +import static eu.dnetlib.dhp.application.ApplicationUtils.*; + import java.io.File; import java.io.FileOutputStream; import java.io.OutputStream; @@ -16,11 +19,8 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.collection.worker.CollectorWorker; import eu.dnetlib.dhp.common.rest.DNetRestClient; public class MDStoreActionNode { @@ -28,11 +28,8 @@ public class MDStoreActionNode { enum MDAction { NEW_VERSION, ROLLBACK, COMMIT, READ_LOCK, READ_UNLOCK - } - private static final ObjectMapper mapper = new ObjectMapper(); - public static String NEW_VERSION_URI = "%s/mdstore/%s/newVersion"; public static final String COMMIT_VERSION_URL = "%s/version/%s/commit/%s"; @@ -48,13 +45,13 @@ public class MDStoreActionNode { final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser( IOUtils .toString( - CollectorWorker.class + MDStoreActionNode.class .getResourceAsStream( "/eu/dnetlib/dhp/collection/mdstore_action_parameters.json"))); argumentParser.parseArgument(args); final MDAction action = MDAction.valueOf(argumentParser.get("action")); - log.info("Curren action is {}", action); + log.info("Current action is {}", action); final String mdStoreManagerURI = argumentParser.get("mdStoreManagerURI"); log.info("mdStoreManagerURI is {}", mdStoreManagerURI); @@ -67,7 +64,7 @@ public class MDStoreActionNode { } final MDStoreVersion currentVersion = DNetRestClient .doGET(String.format(NEW_VERSION_URI, mdStoreManagerURI, mdStoreID), MDStoreVersion.class); - populateOOZIEEnv(MDSTOREVERSIONPARAM, mapper.writeValueAsString(currentVersion)); + populateOOZIEEnv(MDSTOREVERSIONPARAM, MAPPER.writeValueAsString(currentVersion)); break; } case COMMIT: { @@ -77,7 +74,7 @@ public class MDStoreActionNode { throw new IllegalArgumentException("missing or empty argument namenode"); } final String mdStoreVersion_params = argumentParser.get("mdStoreVersion"); - final MDStoreVersion mdStoreVersion = mapper.readValue(mdStoreVersion_params, MDStoreVersion.class); + final MDStoreVersion mdStoreVersion = MAPPER.readValue(mdStoreVersion_params, MDStoreVersion.class); if (StringUtils.isBlank(mdStoreVersion.getId())) { throw new IllegalArgumentException( @@ -110,7 +107,7 @@ public class MDStoreActionNode { } case ROLLBACK: { final String mdStoreVersion_params = argumentParser.get("mdStoreVersion"); - final MDStoreVersion mdStoreVersion = mapper.readValue(mdStoreVersion_params, MDStoreVersion.class); + final MDStoreVersion mdStoreVersion = MAPPER.readValue(mdStoreVersion_params, MDStoreVersion.class); if (StringUtils.isBlank(mdStoreVersion.getId())) { throw new IllegalArgumentException( @@ -127,12 +124,12 @@ public class MDStoreActionNode { } final MDStoreVersion currentVersion = DNetRestClient .doGET(String.format(READ_LOCK_URL, mdStoreManagerURI, mdStoreID), MDStoreVersion.class); - populateOOZIEEnv(MDSTOREREADLOCKPARAM, mapper.writeValueAsString(currentVersion)); + populateOOZIEEnv(MDSTOREREADLOCKPARAM, MAPPER.writeValueAsString(currentVersion)); break; } case READ_UNLOCK: { final String mdStoreVersion_params = argumentParser.get("readMDStoreId"); - final MDStoreVersion mdStoreVersion = mapper.readValue(mdStoreVersion_params, MDStoreVersion.class); + final MDStoreVersion mdStoreVersion = MAPPER.readValue(mdStoreVersion_params, MDStoreVersion.class); if (StringUtils.isBlank(mdStoreVersion.getId())) { throw new IllegalArgumentException( @@ -148,13 +145,4 @@ public class MDStoreActionNode { } - public static void populateOOZIEEnv(final String paramName, String value) throws Exception { - File file = new File(System.getProperty("oozie.action.output.properties")); - Properties props = new Properties(); - - props.setProperty(paramName, value); - OutputStream os = new FileOutputStream(file); - props.store(os, ""); - os.close(); - } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java index ba9bd662e..a0c546858 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java @@ -4,9 +4,12 @@ package eu.dnetlib.dhp.collection.plugin; import java.util.stream.Stream; import eu.dnetlib.dhp.collection.worker.CollectorException; +import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList; import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; public interface CollectorPlugin { Stream collect(ApiDescriptor api) throws CollectorException; + + CollectorPluginErrorLogList getCollectionErrors(); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java index a5e261553..ea74919c5 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java @@ -9,12 +9,15 @@ import java.util.Spliterators; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import org.jetbrains.annotations.NotNull; + import com.google.common.base.Splitter; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; import eu.dnetlib.dhp.collection.worker.CollectorException; +import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList; import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; public class OaiCollectorPlugin implements CollectorPlugin { @@ -26,8 +29,19 @@ public class OaiCollectorPlugin implements CollectorPlugin { private OaiIteratorFactory oaiIteratorFactory; + private final CollectorPluginErrorLogList errorLogList = new CollectorPluginErrorLogList(); + @Override public Stream collect(final ApiDescriptor api) throws CollectorException { + try { + return doCollect(api); + } catch (CollectorException e) { + errorLogList.add(e.getMessage()); + throw e; + } + } + + private Stream doCollect(ApiDescriptor api) throws CollectorException { final String baseUrl = api.getBaseUrl(); final String mdFormat = api.getParams().get(FORMAT_PARAM); final String setParam = api.getParams().get(OAI_SET_PARAM); @@ -65,7 +79,7 @@ public class OaiCollectorPlugin implements CollectorPlugin { .stream() .map( set -> getOaiIteratorFactory() - .newIterator(baseUrl, mdFormat, set, fromDate, untilDate)) + .newIterator(baseUrl, mdFormat, set, fromDate, untilDate, errorLogList)) .iterator(); return StreamSupport @@ -79,4 +93,9 @@ public class OaiCollectorPlugin implements CollectorPlugin { } return oaiIteratorFactory; } + + @Override + public CollectorPluginErrorLogList getCollectionErrors() { + return errorLogList; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java index e54bae67d..2392dee6a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java @@ -15,15 +15,17 @@ import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Node; import org.dom4j.io.SAXReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.collection.worker.CollectorException; +import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList; import eu.dnetlib.dhp.collection.worker.utils.HttpConnector; import eu.dnetlib.dhp.collection.worker.utils.XmlCleaner; public class OaiIterator implements Iterator { - private static final Log log = LogFactory.getLog(OaiIterator.class); // NOPMD by marko on - // 11/24/08 5:02 PM + private static final Logger log = LoggerFactory.getLogger(OaiIterator.class); private final Queue queue = new PriorityBlockingQueue<>(); private final SAXReader reader = new SAXReader(); @@ -36,6 +38,7 @@ public class OaiIterator implements Iterator { private String token; private boolean started; private final HttpConnector httpConnector; + private CollectorPluginErrorLogList errorLogList; public OaiIterator( final String baseUrl, @@ -43,7 +46,8 @@ public class OaiIterator implements Iterator { final String set, final String fromDate, final String untilDate, - final HttpConnector httpConnector) { + final HttpConnector httpConnector, + final CollectorPluginErrorLogList errorLogList) { this.baseUrl = baseUrl; this.mdFormat = mdFormat; this.set = set; @@ -51,6 +55,7 @@ public class OaiIterator implements Iterator { this.untilDate = untilDate; this.started = false; this.httpConnector = httpConnector; + this.errorLogList = errorLogList; } private void verifyStarted() { @@ -139,7 +144,7 @@ public class OaiIterator implements Iterator { private String downloadPage(final String url) throws CollectorException { - final String xml = httpConnector.getInputSource(url); + final String xml = httpConnector.getInputSource(url, errorLogList); Document doc; try { doc = reader.read(new StringReader(xml)); @@ -174,4 +179,8 @@ public class OaiIterator implements Iterator { return doc.valueOf("//*[local-name()='resumptionToken']"); } + + public CollectorPluginErrorLogList getErrorLogList() { + return errorLogList; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java index 4a6ea7f67..eafd265d4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.collection.plugin.oai; import java.util.Iterator; +import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList; import eu.dnetlib.dhp.collection.worker.utils.HttpConnector; public class OaiIteratorFactory { @@ -14,8 +15,9 @@ public class OaiIteratorFactory { final String mdFormat, final String set, final String fromDate, - final String untilDate) { - return new OaiIterator(baseUrl, mdFormat, set, fromDate, untilDate, getHttpConnector()); + final String untilDate, + final CollectorPluginErrorLogList errorLogList) { + return new OaiIterator(baseUrl, mdFormat, set, fromDate, untilDate, getHttpConnector(), errorLogList); } private HttpConnector getHttpConnector() { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java index 3605bdfd6..7033cfd8e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java @@ -15,6 +15,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; +import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList; import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory; import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; @@ -22,69 +23,65 @@ public class CollectorWorker { private static final Logger log = LoggerFactory.getLogger(CollectorWorker.class); - private final CollectorPluginFactory collectorPluginFactory; - private final ApiDescriptor api; private final String hdfsuri; private final String hdfsPath; + private CollectorPlugin plugin; + public CollectorWorker( - final CollectorPluginFactory collectorPluginFactory, final ApiDescriptor api, final String hdfsuri, - final String hdfsPath) { - this.collectorPluginFactory = collectorPluginFactory; + final String hdfsPath) throws CollectorException { this.api = api; this.hdfsuri = hdfsuri; this.hdfsPath = hdfsPath; - + this.plugin = CollectorPluginFactory.getPluginByProtocol(api.getProtocol()); } - public void collect() throws CollectorException { - try { - final CollectorPlugin plugin = collectorPluginFactory.getPluginByProtocol(api.getProtocol()); + public CollectorPluginErrorLogList collect() throws IOException, CollectorException { - // ====== Init HDFS File System Object - Configuration conf = new Configuration(); - // Set FileSystem URI - conf.set("fs.defaultFS", hdfsuri); - // Because of Maven - conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); - conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + // ====== Init HDFS File System Object + Configuration conf = new Configuration(); + // Set FileSystem URI + conf.set("fs.defaultFS", hdfsuri); + // Because of Maven + conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); - System.setProperty("hadoop.home.dir", "/"); - // Get the filesystem - HDFS - FileSystem.get(URI.create(hdfsuri), conf); - Path hdfswritepath = new Path(hdfsPath); + System.setProperty("hadoop.home.dir", "/"); + // Get the filesystem - HDFS - log.info("Created path " + hdfswritepath.toString()); + FileSystem.get(URI.create(hdfsuri), conf); + Path hdfswritepath = new Path(hdfsPath); - final AtomicInteger counter = new AtomicInteger(0); - try (SequenceFile.Writer writer = SequenceFile - .createWriter( - conf, - SequenceFile.Writer.file(hdfswritepath), - SequenceFile.Writer.keyClass(IntWritable.class), - SequenceFile.Writer.valueClass(Text.class))) { - final IntWritable key = new IntWritable(counter.get()); - final Text value = new Text(); - plugin - .collect(api) - .forEach( - content -> { - key.set(counter.getAndIncrement()); - value.set(content); - try { - writer.append(key, value); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - } - } catch (Throwable e) { - throw new CollectorException("Error on collecting ", e); + log.info("Created path " + hdfswritepath.toString()); + + final AtomicInteger counter = new AtomicInteger(0); + try (SequenceFile.Writer writer = SequenceFile + .createWriter( + conf, + SequenceFile.Writer.file(hdfswritepath), + SequenceFile.Writer.keyClass(IntWritable.class), + SequenceFile.Writer.valueClass(Text.class))) { + final IntWritable key = new IntWritable(counter.get()); + final Text value = new Text(); + plugin + .collect(api) + .forEach( + content -> { + key.set(counter.getAndIncrement()); + value.set(content); + try { + writer.append(key, value); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } finally { + return plugin.getCollectionErrors(); } } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java index da5b197d6..1d99689db 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java @@ -2,6 +2,8 @@ package eu.dnetlib.dhp.collection.worker; import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*; +import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; +import static eu.dnetlib.dhp.application.ApplicationUtils.*; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; @@ -10,7 +12,9 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; +import eu.dnetlib.dhp.aggregation.common.AggregationUtility; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList; import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory; import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; @@ -25,8 +29,6 @@ public class CollectorWorkerApplication { private static final Logger log = LoggerFactory.getLogger(CollectorWorkerApplication.class); - private static final CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory(); - /** * @param args */ @@ -49,14 +51,16 @@ public class CollectorWorkerApplication { final String mdStoreVersion = argumentParser.get("mdStoreVersion"); log.info("mdStoreVersion is {}", mdStoreVersion); - final ObjectMapper jsonMapper = new ObjectMapper(); + final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class); + final String hdfsPath = currentVersion.getHdfsPath() + SEQUENCE_FILE_NAME; + log.info("hdfs path is {}", hdfsPath); - final MDStoreVersion currentVersion = jsonMapper.readValue(mdStoreVersion, MDStoreVersion.class); + final ApiDescriptor api = MAPPER.readValue(apiDescriptor, ApiDescriptor.class); - final ApiDescriptor api = jsonMapper.readValue(apiDescriptor, ApiDescriptor.class); - final CollectorWorker worker = new CollectorWorker(collectorPluginFactory, api, hdfsuri, - currentVersion.getHdfsPath() + SEQUENCE_FILE_NAME); - worker.collect(); + final CollectorWorker worker = new CollectorWorker(api, hdfsuri, hdfsPath); + CollectorPluginErrorLogList errors = worker.collect(); + + populateOOZIEEnv("collectorErrors", errors.toString()); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginFactory.java index 6b070b191..7cbcd9b5c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginFactory.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginFactory.java @@ -7,7 +7,7 @@ import eu.dnetlib.dhp.collection.worker.CollectorException; public class CollectorPluginFactory { - public CollectorPlugin getPluginByProtocol(final String protocol) throws CollectorException { + public static CollectorPlugin getPluginByProtocol(final String protocol) throws CollectorException { if (protocol == null) throw new CollectorException("protocol cannot be null"); switch (protocol.toLowerCase().trim()) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector.java index ff3c18aba..fc45b4814 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector.java @@ -16,14 +16,14 @@ import javax.net.ssl.X509TrustManager; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.math.NumberUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.collection.worker.CollectorException; public class HttpConnector { - private static final Log log = LogFactory.getLog(HttpConnector.class); + private static final Logger log = LoggerFactory.getLogger(HttpConnector.class); private int maxNumberOfRetry = 6; private int defaultDelay = 120; // seconds @@ -45,7 +45,20 @@ public class HttpConnector { * @throws CollectorException when retrying more than maxNumberOfRetry times */ public String getInputSource(final String requestUrl) throws CollectorException { - return attemptDownlaodAsString(requestUrl, 1, new CollectorPluginErrorLogList()); + return attemptDownloadAsString(requestUrl, 1, new CollectorPluginErrorLogList()); + } + + /** + * Given the URL returns the content via HTTP GET + * + * @param requestUrl the URL + * @param errorLogList the list of errors + * @return the content of the downloaded resource + * @throws CollectorException when retrying more than maxNumberOfRetry times + */ + public String getInputSource(final String requestUrl, CollectorPluginErrorLogList errorLogList) + throws CollectorException { + return attemptDownloadAsString(requestUrl, 1, errorLogList); } /** @@ -59,18 +72,20 @@ public class HttpConnector { return attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList()); } - private String attemptDownlaodAsString( + private String attemptDownloadAsString( final String requestUrl, final int retryNumber, final CollectorPluginErrorLogList errorList) throws CollectorException { + + log.info("requesting URL [{}]", requestUrl); try { final InputStream s = attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList()); try { return IOUtils.toString(s); } catch (final IOException e) { - log.error("error while retrieving from http-connection occured: " + requestUrl, e); + log.error("error while retrieving from http-connection occurred: {}", requestUrl, e); Thread.sleep(defaultDelay * 1000); errorList.add(e.getMessage()); - return attemptDownlaodAsString(requestUrl, retryNumber + 1, errorList); + return attemptDownloadAsString(requestUrl, retryNumber + 1, errorList); } finally { IOUtils.closeQuietly(s); } @@ -87,7 +102,7 @@ public class HttpConnector { throw new CollectorException("Max number of retries exceeded. Cause: \n " + errorList); } - log.debug("Downloading " + requestUrl + " - try: " + retryNumber); + log.debug("requesting URL [{}], try {}", requestUrl, retryNumber); try { InputStream input = null; @@ -103,7 +118,7 @@ public class HttpConnector { final int retryAfter = obtainRetryAfter(urlConn.getHeaderFields()); if (retryAfter > 0 && urlConn.getResponseCode() == HttpURLConnection.HTTP_UNAVAILABLE) { - log.warn("waiting and repeating request after " + retryAfter + " sec."); + log.warn("waiting and repeating request after {} sec.", retryAfter); Thread.sleep(retryAfter * 1000); errorList.add("503 Service Unavailable"); urlConn.disconnect(); @@ -111,7 +126,7 @@ public class HttpConnector { } else if (urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_PERM || urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_TEMP) { final String newUrl = obtainNewLocation(urlConn.getHeaderFields()); - log.debug("The requested url has been moved to " + newUrl); + log.debug("The requested url has been moved to {}", newUrl); errorList .add( String @@ -121,15 +136,11 @@ public class HttpConnector { urlConn.disconnect(); return attemptDownload(newUrl, retryNumber + 1, errorList); } else if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) { - log - .error( - String - .format( - "HTTP error: %s %s", urlConn.getResponseCode(), urlConn.getResponseMessage())); + final String msg = String + .format("HTTP error: %s %s", urlConn.getResponseCode(), urlConn.getResponseMessage()); + log.error(msg); Thread.sleep(defaultDelay * 1000); - errorList - .add( - String.format("%s %s", urlConn.getResponseCode(), urlConn.getResponseMessage())); + errorList.add(msg); urlConn.disconnect(); return attemptDownload(requestUrl, retryNumber + 1, errorList); } else { @@ -138,7 +149,7 @@ public class HttpConnector { return input; } } catch (final IOException e) { - log.error("error while retrieving from http-connection occured: " + requestUrl, e); + log.error("error while retrieving from http-connection occurred: {}", requestUrl, e); Thread.sleep(defaultDelay * 1000); errorList.add(e.getMessage()); return attemptDownload(requestUrl, retryNumber + 1, errorList); @@ -149,12 +160,12 @@ public class HttpConnector { } private void logHeaderFields(final HttpURLConnection urlConn) throws IOException { - log.debug("StatusCode: " + urlConn.getResponseMessage()); + log.debug("StatusCode: {}", urlConn.getResponseMessage()); for (final Map.Entry> e : urlConn.getHeaderFields().entrySet()) { if (e.getKey() != null) { for (final String v : e.getValue()) { - log.debug(" key: " + e.getKey() + " - value: " + v); + log.debug(" key: {} value: {}", e.getKey(), v); } } } @@ -183,37 +194,6 @@ public class HttpConnector { "The requested url has been MOVED, but 'location' param is MISSING"); } - /** - * register for https scheme; this is a workaround and not intended for the use in trusted environments - */ - public void initTrustManager() { - final X509TrustManager tm = new X509TrustManager() { - - @Override - public void checkClientTrusted(final X509Certificate[] xcs, final String string) { - } - - @Override - public void checkServerTrusted(final X509Certificate[] xcs, final String string) { - } - - @Override - public X509Certificate[] getAcceptedIssuers() { - return null; - } - }; - try { - final SSLContext ctx = SSLContext.getInstance("TLS"); - ctx.init(null, new TrustManager[] { - tm - }, null); - HttpsURLConnection.setDefaultSSLSocketFactory(ctx.getSocketFactory()); - } catch (final GeneralSecurityException e) { - log.fatal(e); - throw new IllegalStateException(e); - } - } - public int getMaxNumberOfRetry() { return maxNumberOfRetry; } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java index 9abfbacac..10964096c 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java @@ -40,7 +40,7 @@ public class DnetCollectorWorkerApplicationTests { public void testFeeding(@TempDir Path testDir) throws Exception { System.out.println(testDir.toString()); - CollectorWorker worker = new CollectorWorker(new CollectorPluginFactory(), getApi(), + CollectorWorker worker = new CollectorWorker(getApi(), "file://" + testDir.toString() + "/file.seq", testDir.toString() + "/file.seq"); worker.collect();