diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/application/ApplicationUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ApplicationUtils.java index 72c41a062..c78fb1b1f 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/application/ApplicationUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ApplicationUtils.java @@ -2,18 +2,43 @@ package eu.dnetlib.dhp.application; import java.io.*; +import java.util.Map; import java.util.Properties; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Maps; + public class ApplicationUtils { - public static void populateOOZIEEnv(final String paramName, String value) throws IOException { + public static Configuration getHadoopConfiguration(String nameNode) { + // ====== Init HDFS File System Object + Configuration conf = new Configuration(); + // Set FileSystem URI + conf.set("fs.defaultFS", nameNode); + // Because of Maven + conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + + System.setProperty("hadoop.home.dir", "/"); + return conf; + } + + public static void populateOOZIEEnv(final Map report) throws IOException { File file = new File(System.getProperty("oozie.action.output.properties")); Properties props = new Properties(); + report.forEach((k, v) -> props.setProperty(k, v)); - props.setProperty(paramName, value); - OutputStream os = new FileOutputStream(file); - props.store(os, ""); - os.close(); + try(OutputStream os = new FileOutputStream(file)) { + props.store(os, ""); + } + } + + public static void populateOOZIEEnv(final String paramName, String value) throws IOException { + Map report = Maps.newHashMap(); + report.put(paramName, value); + + populateOOZIEEnv(report); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationConstants.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationConstants.java index 7c5ad354d..8e0b7260d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationConstants.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationConstants.java @@ -4,6 +4,7 @@ package eu.dnetlib.dhp.aggregation.common; public class AggregationConstants { public static final String SEQUENCE_FILE_NAME = "/sequence_file"; + public static final String REPORT_FILE_NAME = "/report"; public static final String MDSTORE_DATA_PATH = "/store"; public static final String MDSTORE_SIZE_PATH = "/size"; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java index 7332ac071..8dad5bb81 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java @@ -2,9 +2,14 @@ package eu.dnetlib.dhp.aggregation.common; import java.io.BufferedOutputStream; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.nio.charset.StandardCharsets; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -25,16 +30,31 @@ public class AggregationUtility { public static final ObjectMapper MAPPER = new ObjectMapper(); - public static void writeTotalSizeOnHDFS(final SparkSession spark, final Long total, final String path) + public static void writeHdfsFile(final Configuration conf, final String content, final String path) throws IOException { - log.info("writing size ({}) info file {}", total, path); - try (FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); + log.info("writing file {}, size {}", path, content.length()); + try (FileSystem fs = FileSystem.get(conf); BufferedOutputStream os = new BufferedOutputStream(fs.create(new Path(path)))) { - os.write(total.toString().getBytes(StandardCharsets.UTF_8)); + os.write(content.getBytes(StandardCharsets.UTF_8)); os.flush(); } + } + public static String readHdfsFile(Configuration conf, String path) throws IOException { + log.info("reading file {}", path); + + try (FileSystem fs = FileSystem.get(conf)) { + final Path p = new Path(path); + if (!fs.exists(p)) { + throw new FileNotFoundException(path); + } + return IOUtils.toString(fs.open(p)); + } + } + + public static T readHdfsFileAs(Configuration conf, String path, Class clazz) throws IOException { + return MAPPER.readValue(readHdfsFile(conf, path), clazz); } public static void saveDataset(final Dataset mdstore, final String targetPath) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java index fdf3965d6..5839df2d0 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java @@ -28,8 +28,6 @@ import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; @@ -47,7 +45,7 @@ public class GenerateNativeStoreSparkJob { .toString( GenerateNativeStoreSparkJob.class .getResourceAsStream( - "/eu/dnetlib/dhp/collection/collection_input_parameters.json"))); + "/eu/dnetlib/dhp/collection/generate_native_input_parameters.json"))); parser.parseArgument(args); final String provenanceArgument = parser.get("provenance"); @@ -148,7 +146,9 @@ public class GenerateNativeStoreSparkJob { final Long total = spark.read().load(targetPath).count(); log.info("collected {} records for datasource '{}'", total, provenance.getDatasourceName()); - writeTotalSizeOnHDFS(spark, total, currentVersion.getHdfsPath() + MDSTORE_SIZE_PATH); + writeHdfsFile( + spark.sparkContext().hadoopConfiguration(), total.toString(), + currentVersion.getHdfsPath() + MDSTORE_SIZE_PATH); } public static class MDStoreAggregator extends Aggregator { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java index a0c546858..d0905aade 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java @@ -4,12 +4,11 @@ package eu.dnetlib.dhp.collection.plugin; import java.util.stream.Stream; import eu.dnetlib.dhp.collection.worker.CollectorException; -import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList; +import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport; import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; public interface CollectorPlugin { - Stream collect(ApiDescriptor api) throws CollectorException; + Stream collect(ApiDescriptor api, CollectorPluginReport report) throws CollectorException; - CollectorPluginErrorLogList getCollectionErrors(); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java index ea74919c5..29e12f312 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java @@ -9,15 +9,14 @@ import java.util.Spliterators; import java.util.stream.Stream; import java.util.stream.StreamSupport; -import org.jetbrains.annotations.NotNull; - import com.google.common.base.Splitter; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; import eu.dnetlib.dhp.collection.worker.CollectorException; -import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList; +import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport; +import eu.dnetlib.dhp.collection.worker.utils.HttpClientParams; import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; public class OaiCollectorPlugin implements CollectorPlugin { @@ -29,19 +28,15 @@ public class OaiCollectorPlugin implements CollectorPlugin { private OaiIteratorFactory oaiIteratorFactory; - private final CollectorPluginErrorLogList errorLogList = new CollectorPluginErrorLogList(); + private HttpClientParams clientParams; - @Override - public Stream collect(final ApiDescriptor api) throws CollectorException { - try { - return doCollect(api); - } catch (CollectorException e) { - errorLogList.add(e.getMessage()); - throw e; - } + public OaiCollectorPlugin(HttpClientParams clientParams) { + this.clientParams = clientParams; } - private Stream doCollect(ApiDescriptor api) throws CollectorException { + @Override + public Stream collect(final ApiDescriptor api, final CollectorPluginReport report) + throws CollectorException { final String baseUrl = api.getBaseUrl(); final String mdFormat = api.getParams().get(FORMAT_PARAM); final String setParam = api.getParams().get(OAI_SET_PARAM); @@ -79,7 +74,7 @@ public class OaiCollectorPlugin implements CollectorPlugin { .stream() .map( set -> getOaiIteratorFactory() - .newIterator(baseUrl, mdFormat, set, fromDate, untilDate, errorLogList)) + .newIterator(baseUrl, mdFormat, set, fromDate, untilDate, getClientParams(), report)) .iterator(); return StreamSupport @@ -94,8 +89,11 @@ public class OaiCollectorPlugin implements CollectorPlugin { return oaiIteratorFactory; } - @Override - public CollectorPluginErrorLogList getCollectionErrors() { - return errorLogList; + public HttpClientParams getClientParams() { + return clientParams; + } + + public void setClientParams(HttpClientParams clientParams) { + this.clientParams = clientParams; } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java index 9c1ff0663..667e7a3d3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java @@ -17,7 +17,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.collection.worker.CollectorException; -import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList; +import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport; import eu.dnetlib.dhp.collection.worker.utils.HttpConnector2; import eu.dnetlib.dhp.collection.worker.utils.XmlCleaner; @@ -25,6 +25,8 @@ public class OaiIterator implements Iterator { private static final Logger log = LoggerFactory.getLogger(OaiIterator.class); + private final static String REPORT_PREFIX = "oai:"; + private final Queue queue = new PriorityBlockingQueue<>(); private final SAXReader reader = new SAXReader(); @@ -36,7 +38,7 @@ public class OaiIterator implements Iterator { private String token; private boolean started; private final HttpConnector2 httpConnector; - private CollectorPluginErrorLogList errorLogList; + private CollectorPluginReport errorLogList; public OaiIterator( final String baseUrl, @@ -45,7 +47,7 @@ public class OaiIterator implements Iterator { final String fromDate, final String untilDate, final HttpConnector2 httpConnector, - final CollectorPluginErrorLogList errorLogList) { + final CollectorPluginReport errorLogList) { this.baseUrl = baseUrl; this.mdFormat = mdFormat; this.set = set; @@ -111,7 +113,7 @@ public class OaiIterator implements Iterator { return downloadPage(url); } catch (final UnsupportedEncodingException e) { - errorLogList.add(e.getMessage()); + errorLogList.put(e.getClass().getName(), e.getMessage()); throw new CollectorException(e); } } @@ -137,7 +139,7 @@ public class OaiIterator implements Iterator { + "?verb=ListRecords&resumptionToken=" + URLEncoder.encode(resumptionToken, "UTF-8")); } catch (final UnsupportedEncodingException e) { - errorLogList.add(e.getMessage()); + errorLogList.put(e.getClass().getName(), e.getMessage()); throw new CollectorException(e); } } @@ -150,14 +152,14 @@ public class OaiIterator implements Iterator { doc = reader.read(new StringReader(xml)); } catch (final DocumentException e) { log.warn("Error parsing xml, I try to clean it. {}", e.getMessage()); - errorLogList.add(e.getMessage()); + errorLogList.put(e.getClass().getName(), e.getMessage()); final String cleaned = XmlCleaner.cleanAllEntities(xml); try { doc = reader.read(new StringReader(cleaned)); } catch (final DocumentException e1) { final String resumptionToken = extractResumptionToken(xml); if (resumptionToken == null) { - errorLogList.add(e1.getMessage()); + errorLogList.put(e1.getClass().getName(), e1.getMessage()); throw new CollectorException("Error parsing cleaned document:\n" + cleaned, e1); } return resumptionToken; @@ -166,15 +168,15 @@ public class OaiIterator implements Iterator { final Node errorNode = doc.selectSingleNode("/*[local-name()='OAI-PMH']/*[local-name()='error']"); if (errorNode != null) { - final String code = errorNode.valueOf("@code"); - if ("noRecordsMatch".equalsIgnoreCase(code.trim())) { + final String code = errorNode.valueOf("@code").trim(); + if ("noRecordsMatch".equalsIgnoreCase(code)) { final String msg = "noRecordsMatch for oai call : " + url; log.warn(msg); - errorLogList.add(msg); + errorLogList.put(REPORT_PREFIX + code, msg); return null; } else { final String msg = code + " - " + errorNode.getText(); - errorLogList.add(msg); + errorLogList.put(REPORT_PREFIX + "error", msg); throw new CollectorException(msg); } } @@ -186,7 +188,7 @@ public class OaiIterator implements Iterator { return doc.valueOf("//*[local-name()='resumptionToken']"); } - public CollectorPluginErrorLogList getErrorLogList() { + public CollectorPluginReport getErrorLogList() { return errorLogList; } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java index d6759580f..c751a94e7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java @@ -3,7 +3,8 @@ package eu.dnetlib.dhp.collection.plugin.oai; import java.util.Iterator; -import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList; +import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport; +import eu.dnetlib.dhp.collection.worker.utils.HttpClientParams; import eu.dnetlib.dhp.collection.worker.utils.HttpConnector2; public class OaiIteratorFactory { @@ -16,13 +17,15 @@ public class OaiIteratorFactory { final String set, final String fromDate, final String untilDate, - final CollectorPluginErrorLogList errorLogList) { - return new OaiIterator(baseUrl, mdFormat, set, fromDate, untilDate, getHttpConnector(), errorLogList); + final HttpClientParams clientParams, + final CollectorPluginReport errorLogList) { + return new OaiIterator(baseUrl, mdFormat, set, fromDate, untilDate, getHttpConnector(clientParams), + errorLogList); } - private HttpConnector2 getHttpConnector() { + private HttpConnector2 getHttpConnector(HttpClientParams clientParams) { if (httpConnector == null) - httpConnector = new HttpConnector2(); + httpConnector = new HttpConnector2(clientParams); return httpConnector; } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java index 9d82a1ed4..b0efd088c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java @@ -1,14 +1,13 @@ package eu.dnetlib.dhp.collection.worker; +import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.SEQUENCE_FILE_NAME; +import static eu.dnetlib.dhp.application.ApplicationUtils.*; + import java.io.IOException; -import java.net.URI; import java.util.concurrent.atomic.AtomicInteger; -import eu.dnetlib.dhp.message.Message; -import eu.dnetlib.dhp.message.MessageSender; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; @@ -16,10 +15,14 @@ import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; -import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList; import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory; +import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport; +import eu.dnetlib.dhp.collection.worker.utils.HttpClientParams; +import eu.dnetlib.dhp.collection.worker.utils.UnknownCollectorPluginException; import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; +import eu.dnetlib.dhp.message.MessageSender; public class CollectorWorker { @@ -27,70 +30,71 @@ public class CollectorWorker { private final ApiDescriptor api; - private final String hdfsuri; + private final Configuration conf; - private final String hdfsPath; + private final MDStoreVersion mdStoreVersion; + + private final HttpClientParams clientParams; + + private final CollectorPluginReport report; private final MessageSender messageSender; - public CollectorWorker( final ApiDescriptor api, - final String hdfsuri, - final String hdfsPath, - final MessageSender messageSender) { + final Configuration conf, + final MDStoreVersion mdStoreVersion, + final HttpClientParams clientParams, + final MessageSender messageSender, + final CollectorPluginReport report) { this.api = api; - this.hdfsuri = hdfsuri; - this.hdfsPath = hdfsPath; + this.conf = conf; + this.mdStoreVersion = mdStoreVersion; + this.clientParams = clientParams; this.messageSender = messageSender; + this.report = report; } - public CollectorPluginErrorLogList collect() throws IOException, CollectorException { + public void collect() throws UnknownCollectorPluginException, CollectorException, IOException { - // ====== Init HDFS File System Object - Configuration conf = new Configuration(); - // Set FileSystem URI - conf.set("fs.defaultFS", hdfsuri); - // Because of Maven - conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); - conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + final String outputPath = mdStoreVersion.getHdfsPath() + SEQUENCE_FILE_NAME; + log.info("outputPath path is {}", outputPath); - System.setProperty("hadoop.home.dir", "/"); - // Get the filesystem - HDFS - - FileSystem.get(URI.create(hdfsuri), conf); - Path hdfswritepath = new Path(hdfsPath); - - log.info("Created path " + hdfswritepath.toString()); - - final CollectorPlugin plugin = CollectorPluginFactory.getPluginByProtocol(api.getProtocol()); + final CollectorPlugin plugin = CollectorPluginFactory.getPluginByProtocol(clientParams, api.getProtocol()); final AtomicInteger counter = new AtomicInteger(0); try (SequenceFile.Writer writer = SequenceFile .createWriter( conf, - SequenceFile.Writer.file(hdfswritepath), + SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(Text.class))) { final IntWritable key = new IntWritable(counter.get()); final Text value = new Text(); plugin - .collect(api) + .collect(api, report) .forEach( content -> { key.set(counter.getAndIncrement()); - if (counter.get()% 500 == 0) + if (counter.get() % 500 == 0) messageSender.sendMessage(counter.longValue(), null); value.set(content); try { writer.append(key, value); - } catch (IOException e) { + } catch (Throwable e) { + report.put(e.getClass().getName(), e.getMessage()); + log.warn("setting report to failed"); + report.setSuccess(false); throw new RuntimeException(e); } }); + } catch (Throwable e) { + report.put(e.getClass().getName(), e.getMessage()); + log.warn("setting report to failed"); + report.setSuccess(false); } finally { - messageSender.sendMessage(counter.longValue(),counter.longValue()); - return plugin.getCollectionErrors(); + messageSender.sendMessage(counter.longValue(), counter.longValue()); } } + } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java index d8e8a8e49..8f26074c3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java @@ -6,22 +6,27 @@ import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; import static eu.dnetlib.dhp.application.ApplicationUtils.*; import java.io.IOException; +import java.util.Optional; -import eu.dnetlib.dhp.message.Message; -import eu.dnetlib.dhp.message.MessageSender; import org.apache.commons.cli.ParseException; +import org.apache.commons.io.FileSystemUtils; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; import eu.dnetlib.dhp.aggregation.common.AggregationUtility; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList; -import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory; +import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport; +import eu.dnetlib.dhp.collection.worker.utils.HttpClientParams; +import eu.dnetlib.dhp.collection.worker.utils.UnknownCollectorPluginException; import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; +import eu.dnetlib.dhp.message.MessageSender; /** * CollectorWorkerApplication is the main class responsible to start the metadata collection process, storing the outcomes @@ -35,19 +40,18 @@ public class CollectorWorkerApplication { private static final Logger log = LoggerFactory.getLogger(CollectorWorkerApplication.class); - public static final String COLLECTOR_WORKER_ERRORS = "collectorWorker-errors"; - /** * @param args */ - public static void main(final String[] args) throws ParseException, IOException, CollectorException { + public static void main(final String[] args) + throws ParseException, IOException, UnknownCollectorPluginException, CollectorException { final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser( IOUtils .toString( CollectorWorker.class .getResourceAsStream( - "/eu/dnetlib/dhp/collection/collector_parameter.json"))); + "/eu/dnetlib/dhp/collection/collector_worker_input_parameter.json"))); argumentParser.parseArgument(args); final String hdfsuri = argumentParser.get("namenode"); @@ -65,21 +69,61 @@ public class CollectorWorkerApplication { final String workflowId = argumentParser.get("workflowId"); log.info("workflowId is {}", workflowId); - final MessageSender ms = new MessageSender(dnetMessageManagerURL,workflowId); + final MessageSender ms = new MessageSender(dnetMessageManagerURL, workflowId); final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class); - final String hdfsPath = currentVersion.getHdfsPath() + SEQUENCE_FILE_NAME; - log.info("hdfs path is {}", hdfsPath); + + final String reportPath = currentVersion.getHdfsPath() + REPORT_FILE_NAME; + log.info("report path is {}", reportPath); + + final HttpClientParams clientParams = getClientParams(argumentParser); final ApiDescriptor api = MAPPER.readValue(apiDescriptor, ApiDescriptor.class); + final Configuration conf = getHadoopConfiguration(hdfsuri); - final CollectorWorker worker = new CollectorWorker(api, hdfsuri, hdfsPath, ms); - CollectorPluginErrorLogList errors = worker.collect(); - - populateOOZIEEnv(COLLECTOR_WORKER_ERRORS, errors.toString()); - + try (CollectorPluginReport report = new CollectorPluginReport(FileSystem.get(conf), new Path(reportPath))) { + final CollectorWorker worker = new CollectorWorker(api, conf, currentVersion, clientParams, ms, report); + worker.collect(); + report.setSuccess(true); + } catch (Throwable e) { + log.info("got exception {}, ignoring", e.getMessage()); + } } + private static HttpClientParams getClientParams(ArgumentApplicationParser argumentParser) { + final HttpClientParams clientParams = new HttpClientParams(); + clientParams + .setMaxNumberOfRetry( + Optional + .ofNullable(argumentParser.get("maxNumberOfRetry")) + .map(Integer::parseInt) + .orElse(HttpClientParams._maxNumberOfRetry)); + log.info("maxNumberOfRetry is {}", clientParams.getMaxNumberOfRetry()); + clientParams + .setRetryDelay( + Optional + .ofNullable(argumentParser.get("retryDelay")) + .map(Integer::parseInt) + .orElse(HttpClientParams._retryDelay)); + log.info("retryDelay is {}", clientParams.getRetryDelay()); + + clientParams + .setConnectTimeOut( + Optional + .ofNullable(argumentParser.get("connectTimeOut")) + .map(Integer::parseInt) + .orElse(HttpClientParams._connectTimeOut)); + log.info("connectTimeOut is {}", clientParams.getConnectTimeOut()); + + clientParams + .setReadTimeOut( + Optional + .ofNullable(argumentParser.get("readTimeOut")) + .map(Integer::parseInt) + .orElse(HttpClientParams._readTimeOut)); + log.info("readTimeOut is {}", clientParams.getReadTimeOut()); + return clientParams; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerReporter.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerReporter.java new file mode 100644 index 000000000..e0e402cfb --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerReporter.java @@ -0,0 +1,69 @@ + +package eu.dnetlib.dhp.collection.worker; + +import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.REPORT_FILE_NAME; +import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; +import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.MAPPER; +import static eu.dnetlib.dhp.application.ApplicationUtils.getHadoopConfiguration; +import static eu.dnetlib.dhp.application.ApplicationUtils.populateOOZIEEnv; + +import java.io.IOException; +import java.util.Objects; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; +import eu.dnetlib.dhp.aggregation.common.AggregationUtility; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport; +import eu.dnetlib.dhp.collection.worker.utils.UnknownCollectorPluginException; + +/** + * CollectorWorkerReporter + */ +public class CollectorWorkerReporter { + + private static final Logger log = LoggerFactory.getLogger(CollectorWorkerReporter.class); + + /** + * @param args + */ + public static void main(final String[] args) throws IOException, ParseException, CollectorException { + + final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser( + IOUtils + .toString( + CollectorWorker.class + .getResourceAsStream( + "/eu/dnetlib/dhp/collection/collector_reporter_input_parameter.json"))); + argumentParser.parseArgument(args); + + final String nameNode = argumentParser.get("namenode"); + log.info("nameNode is {}", nameNode); + + final String mdStoreVersion = argumentParser.get("mdStoreVersion"); + log.info("mdStoreVersion is {}", mdStoreVersion); + + final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class); + + final String reportPath = currentVersion.getHdfsPath() + REPORT_FILE_NAME; + log.info("report path is {}", reportPath); + + final Configuration conf = getHadoopConfiguration(nameNode); + CollectorPluginReport report = readHdfsFileAs(conf, reportPath, CollectorPluginReport.class); + if (Objects.isNull(report)) { + throw new CollectorException("collection report is NULL"); + } + log.info("report success: {}, size: {}", report.isSuccess(), report.size()); + report.forEach((k, v) -> log.info("{} - {}", k, v)); + if (!report.isSuccess()) { + throw new CollectorException("collection report indicates a failure"); + } + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginErrorLogList.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginErrorLogList.java deleted file mode 100644 index dcaf0ea56..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginErrorLogList.java +++ /dev/null @@ -1,19 +0,0 @@ - -package eu.dnetlib.dhp.collection.worker.utils; - -import java.util.LinkedList; - -public class CollectorPluginErrorLogList extends LinkedList { - - private static final long serialVersionUID = -6925786561303289704L; - - @Override - public String toString() { - String log = ""; - int index = 0; - for (final String errorMessage : this) { - log += String.format("Retry #%s: %s / ", index++, errorMessage); - } - return log; - } -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginFactory.java index 7cbcd9b5c..ab7dad077 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginFactory.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginFactory.java @@ -7,14 +7,15 @@ import eu.dnetlib.dhp.collection.worker.CollectorException; public class CollectorPluginFactory { - public static CollectorPlugin getPluginByProtocol(final String protocol) throws CollectorException { + public static CollectorPlugin getPluginByProtocol(final HttpClientParams clientParams, final String protocol) + throws UnknownCollectorPluginException { if (protocol == null) - throw new CollectorException("protocol cannot be null"); + throw new UnknownCollectorPluginException("protocol cannot be null"); switch (protocol.toLowerCase().trim()) { case "oai": - return new OaiCollectorPlugin(); + return new OaiCollectorPlugin(clientParams); default: - throw new CollectorException("UNknown protocol"); + throw new UnknownCollectorPluginException("Unknown protocol"); } } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginReport.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginReport.java new file mode 100644 index 000000000..b7bf539dc --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginReport.java @@ -0,0 +1,64 @@ + +package eu.dnetlib.dhp.collection.worker.utils; + +import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.MAPPER; + +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Objects; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +import eu.dnetlib.dhp.application.ApplicationUtils; + +public class CollectorPluginReport extends LinkedHashMap implements Closeable { + + private static final Logger log = LoggerFactory.getLogger(CollectorPluginReport.class); + + @JsonIgnore + private FileSystem fs; + + @JsonIgnore + private Path path; + + @JsonIgnore + private FSDataOutputStream fos; + + public static String SUCCESS = "success"; + + public CollectorPluginReport() { + } + + public CollectorPluginReport(FileSystem fs, Path path) throws IOException { + this.fs = fs; + this.path = path; + + this.fos = fs.create(path); + } + + public Boolean isSuccess() { + return Boolean.valueOf(get(SUCCESS)); + } + + public void setSuccess(Boolean success) { + put(SUCCESS, String.valueOf(success)); + } + + @Override + public void close() throws IOException { + final String data = MAPPER.writeValueAsString(this); + if (Objects.nonNull(fos)) { + log.info("writing report {} to {}", data, path.toString()); + IOUtils.write(data, fos); + ApplicationUtils.populateOOZIEEnv(this); + } + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpClientParams.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpClientParams.java new file mode 100644 index 000000000..e77f3680f --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpClientParams.java @@ -0,0 +1,62 @@ + +package eu.dnetlib.dhp.collection.worker.utils; + +/** + * Bundles the http connection parameters driving the client behaviour. + */ +public class HttpClientParams { + + public static int _maxNumberOfRetry = 3; + public static int _retryDelay = 10; // seconds + public static int _connectTimeOut = 10; // seconds + public static int _readTimeOut = 30; // seconds + + private int maxNumberOfRetry; + private int retryDelay; + private int connectTimeOut; + private int readTimeOut; + + public HttpClientParams() { + this(_maxNumberOfRetry, _retryDelay, _connectTimeOut, _readTimeOut); + } + + public HttpClientParams(int maxNumberOfRetry, int retryDelay, int connectTimeOut, int readTimeOut) { + this.maxNumberOfRetry = maxNumberOfRetry; + this.retryDelay = retryDelay; + this.connectTimeOut = connectTimeOut; + this.readTimeOut = readTimeOut; + } + + public int getMaxNumberOfRetry() { + return maxNumberOfRetry; + } + + public void setMaxNumberOfRetry(int maxNumberOfRetry) { + this.maxNumberOfRetry = maxNumberOfRetry; + } + + public int getRetryDelay() { + return retryDelay; + } + + public void setRetryDelay(int retryDelay) { + this.retryDelay = retryDelay; + } + + public void setConnectTimeOut(int connectTimeOut) { + this.connectTimeOut = connectTimeOut; + } + + public int getConnectTimeOut() { + return connectTimeOut; + } + + public int getReadTimeOut() { + return readTimeOut; + } + + public void setReadTimeOut(int readTimeOut) { + this.readTimeOut = readTimeOut; + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector.java deleted file mode 100644 index 39c2371b9..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector.java +++ /dev/null @@ -1,218 +0,0 @@ - -package eu.dnetlib.dhp.collection.worker.utils; - -import java.io.IOException; -import java.io.InputStream; -import java.net.*; -import java.util.List; -import java.util.Map; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.math.NumberUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import eu.dnetlib.dhp.collection.worker.CollectorException; - -@Deprecated -public class HttpConnector { - - private static final Logger log = LoggerFactory.getLogger(HttpConnector.class); - - private int maxNumberOfRetry = 6; - private int defaultDelay = 120; // seconds - private int readTimeOut = 120; // seconds - - private String responseType = null; - - private final String userAgent = "Mozilla/5.0 (compatible; OAI; +http://www.openaire.eu)"; - - public HttpConnector() { - CookieHandler.setDefault(new CookieManager(null, CookiePolicy.ACCEPT_ALL)); - } - - /** - * Given the URL returns the content via HTTP GET - * - * @param requestUrl the URL - * @return the content of the downloaded resource - * @throws CollectorException when retrying more than maxNumberOfRetry times - */ - public String getInputSource(final String requestUrl) throws CollectorException { - return attemptDownloadAsString(requestUrl, 1, new CollectorPluginErrorLogList()); - } - - /** - * Given the URL returns the content via HTTP GET - * - * @param requestUrl the URL - * @param errorLogList the list of errors - * @return the content of the downloaded resource - * @throws CollectorException when retrying more than maxNumberOfRetry times - */ - public String getInputSource(final String requestUrl, CollectorPluginErrorLogList errorLogList) - throws CollectorException { - return attemptDownloadAsString(requestUrl, 1, errorLogList); - } - - /** - * Given the URL returns the content as a stream via HTTP GET - * - * @param requestUrl the URL - * @return the content of the downloaded resource as InputStream - * @throws CollectorException when retrying more than maxNumberOfRetry times - */ - public InputStream getInputSourceAsStream(final String requestUrl) throws CollectorException { - return attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList()); - } - - private String attemptDownloadAsString( - final String requestUrl, final int retryNumber, final CollectorPluginErrorLogList errorList) - throws CollectorException { - - log.info("requesting URL [{}]", requestUrl); - try { - final InputStream s = attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList()); - try { - return IOUtils.toString(s); - } catch (final IOException e) { - log.error("error while retrieving from http-connection occurred: {}", requestUrl, e); - Thread.sleep(defaultDelay * 1000); - errorList.add(e.getMessage()); - return attemptDownloadAsString(requestUrl, retryNumber + 1, errorList); - } finally { - IOUtils.closeQuietly(s); - } - } catch (final InterruptedException e) { - throw new CollectorException(e); - } - } - - private InputStream attemptDownload( - final String requestUrl, final int retryNumber, final CollectorPluginErrorLogList errorList) - throws CollectorException { - - if (retryNumber > maxNumberOfRetry) { - throw new CollectorException("Max number of retries exceeded. Cause: \n " + errorList); - } - - log.debug("requesting URL [{}], try {}", requestUrl, retryNumber); - try { - InputStream input = null; - - try { - final HttpURLConnection urlConn = (HttpURLConnection) new URL(requestUrl).openConnection(); - urlConn.setInstanceFollowRedirects(false); - urlConn.setReadTimeout(readTimeOut * 1000); - urlConn.addRequestProperty("User-Agent", userAgent); - - if (log.isDebugEnabled()) { - logHeaderFields(urlConn); - } - - final int retryAfter = obtainRetryAfter(urlConn.getHeaderFields()); - if (retryAfter > 0 && urlConn.getResponseCode() == HttpURLConnection.HTTP_UNAVAILABLE) { - log.warn("waiting and repeating request after {} sec.", retryAfter); - Thread.sleep(retryAfter * 1000); - errorList.add("503 Service Unavailable"); - urlConn.disconnect(); - return attemptDownload(requestUrl, retryNumber + 1, errorList); - } else if (urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_PERM - || urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_TEMP) { - final String newUrl = obtainNewLocation(urlConn.getHeaderFields()); - log.debug("The requested url has been moved to {}", newUrl); - errorList - .add( - String - .format( - "%s %s. Moved to: %s", - urlConn.getResponseCode(), urlConn.getResponseMessage(), newUrl)); - urlConn.disconnect(); - return attemptDownload(newUrl, retryNumber + 1, errorList); - } else if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) { - final String msg = String - .format("HTTP error: %s %s", urlConn.getResponseCode(), urlConn.getResponseMessage()); - log.error(msg); - Thread.sleep(defaultDelay * 1000); - errorList.add(msg); - urlConn.disconnect(); - return attemptDownload(requestUrl, retryNumber + 1, errorList); - } else { - input = urlConn.getInputStream(); - responseType = urlConn.getContentType(); - return input; - } - } catch (final IOException e) { - log.error("error while retrieving from http-connection occurred: {}", requestUrl, e); - Thread.sleep(defaultDelay * 1000); - errorList.add(e.getMessage()); - return attemptDownload(requestUrl, retryNumber + 1, errorList); - } - } catch (final InterruptedException e) { - throw new CollectorException(e); - } - } - - private void logHeaderFields(final HttpURLConnection urlConn) throws IOException { - log.debug("StatusCode: {}", urlConn.getResponseMessage()); - - for (final Map.Entry> e : urlConn.getHeaderFields().entrySet()) { - if (e.getKey() != null) { - for (final String v : e.getValue()) { - log.debug(" key: {} value: {}", e.getKey(), v); - } - } - } - } - - private int obtainRetryAfter(final Map> headerMap) { - for (final String key : headerMap.keySet()) { - if (key != null - && key.toLowerCase().equals("retry-after") - && headerMap.get(key).size() > 0 - && NumberUtils.isNumber(headerMap.get(key).get(0))) { - return Integer.parseInt(headerMap.get(key).get(0)) + 10; - } - } - return -1; - } - - private String obtainNewLocation(final Map> headerMap) - throws CollectorException { - for (final String key : headerMap.keySet()) { - if (key != null && key.toLowerCase().equals("location") && headerMap.get(key).size() > 0) { - return headerMap.get(key).get(0); - } - } - throw new CollectorException( - "The requested url has been MOVED, but 'location' param is MISSING"); - } - - public int getMaxNumberOfRetry() { - return maxNumberOfRetry; - } - - public void setMaxNumberOfRetry(final int maxNumberOfRetry) { - this.maxNumberOfRetry = maxNumberOfRetry; - } - - public int getDefaultDelay() { - return defaultDelay; - } - - public void setDefaultDelay(final int defaultDelay) { - this.defaultDelay = defaultDelay; - } - - public int getReadTimeOut() { - return readTimeOut; - } - - public void setReadTimeOut(final int readTimeOut) { - this.readTimeOut = readTimeOut; - } - - public String getResponseType() { - return responseType; - } -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector2.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector2.java index b316f34ed..68b1ef8ad 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector2.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector2.java @@ -1,24 +1,17 @@ package eu.dnetlib.dhp.collection.worker.utils; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.net.*; -import java.security.GeneralSecurityException; -import java.security.cert.X509Certificate; import java.util.List; import java.util.Map; -import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; -import javax.net.ssl.X509TrustManager; - import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.math.NumberUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.http.HttpHeaders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.collection.worker.CollectorException; @@ -29,162 +22,151 @@ import eu.dnetlib.dhp.collection.worker.CollectorException; */ public class HttpConnector2 { - private static final Log log = LogFactory.getLog(HttpConnector.class); + private static final Logger log = LoggerFactory.getLogger(HttpConnector2.class); - private int maxNumberOfRetry = 6; - private int defaultDelay = 120; // seconds - private int readTimeOut = 120; // seconds + private static final String REPORT_PREFIX = "http:"; + + private HttpClientParams clientParams; private String responseType = null; private String userAgent = "Mozilla/5.0 (compatible; OAI; +http://www.openaire.eu)"; public HttpConnector2() { + this(new HttpClientParams()); + } + + public HttpConnector2(HttpClientParams clientParams) { + this.clientParams = clientParams; CookieHandler.setDefault(new CookieManager(null, CookiePolicy.ACCEPT_ALL)); } /** - * @see HttpConnector2#getInputSource(java.lang.String, eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList) - */ - public String getInputSource(final String requestUrl) throws CollectorException { - return attemptDownlaodAsString(requestUrl, 1, new CollectorPluginErrorLogList()); - } - - /** - * @see HttpConnector2#getInputSource(java.lang.String, eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList) + * @see HttpConnector2#getInputSource(java.lang.String, CollectorPluginReport) */ public InputStream getInputSourceAsStream(final String requestUrl) throws CollectorException { return IOUtils.toInputStream(getInputSource(requestUrl)); } + /** + * @see HttpConnector2#getInputSource(java.lang.String, CollectorPluginReport) + */ + public String getInputSource(final String requestUrl) throws CollectorException { + return attemptDownloadAsString(requestUrl, 1, new CollectorPluginReport()); + } + /** * Given the URL returns the content via HTTP GET * * @param requestUrl the URL - * @param errorLogList the list of errors + * @param report the list of errors * @return the content of the downloaded resource * @throws CollectorException when retrying more than maxNumberOfRetry times */ - public String getInputSource(final String requestUrl, CollectorPluginErrorLogList errorLogList) + public String getInputSource(final String requestUrl, CollectorPluginReport report) throws CollectorException { - return attemptDownlaodAsString(requestUrl, 1, errorLogList); + return attemptDownloadAsString(requestUrl, 1, report); } - private String attemptDownlaodAsString(final String requestUrl, final int retryNumber, - final CollectorPluginErrorLogList errorList) - throws CollectorException { - try { - InputStream s = attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList()); - try { - return IOUtils.toString(s); - } catch (IOException e) { - log.error("error while retrieving from http-connection occured: " + requestUrl, e); - Thread.sleep(defaultDelay * 1000); - errorList.add(e.getMessage()); - return attemptDownlaodAsString(requestUrl, retryNumber + 1, errorList); - } finally { - IOUtils.closeQuietly(s); - } - } catch (InterruptedException e) { + private String attemptDownloadAsString(final String requestUrl, final int retryNumber, + final CollectorPluginReport report) throws CollectorException { + + try (InputStream s = attemptDownload(requestUrl, retryNumber, report)) { + return IOUtils.toString(s); + } catch (IOException e) { + log.error(e.getMessage(), e); throw new CollectorException(e); } } private InputStream attemptDownload(final String requestUrl, final int retryNumber, - final CollectorPluginErrorLogList errorList) - throws CollectorException { + final CollectorPluginReport report) throws CollectorException, IOException { - if (retryNumber > maxNumberOfRetry) { - throw new CollectorException("Max number of retries exceeded. Cause: \n " + errorList); + if (retryNumber > getClientParams().getMaxNumberOfRetry()) { + throw new CollectorException("Max number of retries exceeded. Cause: \n " + report); } - log.debug("Downloading " + requestUrl + " - try: " + retryNumber); + log.info("Downloading attempt {} [{}]", retryNumber, requestUrl); + + InputStream input = null; + try { - InputStream input = null; + final HttpURLConnection urlConn = (HttpURLConnection) new URL(requestUrl).openConnection(); + urlConn.setInstanceFollowRedirects(false); + urlConn.setReadTimeout(getClientParams().getReadTimeOut() * 1000); + urlConn.setConnectTimeout(getClientParams().getConnectTimeOut() * 1000); + urlConn.addRequestProperty(HttpHeaders.USER_AGENT, userAgent); - try { - final HttpURLConnection urlConn = (HttpURLConnection) new URL(requestUrl).openConnection(); - urlConn.setInstanceFollowRedirects(false); - urlConn.setReadTimeout(readTimeOut * 1000); - urlConn.addRequestProperty("User-Agent", userAgent); - - if (log.isDebugEnabled()) { - logHeaderFields(urlConn); - } - - int retryAfter = obtainRetryAfter(urlConn.getHeaderFields()); - if (is2xx(urlConn.getResponseCode())) { - input = urlConn.getInputStream(); - responseType = urlConn.getContentType(); - return input; - } - if (is3xx(urlConn.getResponseCode())) { - // REDIRECTS - final String newUrl = obtainNewLocation(urlConn.getHeaderFields()); - log.debug(String.format("The requested url %s has been moved to %s", requestUrl, newUrl)); - errorList - .add( - String - .format( - "%s %s %s. Moved to: %s", requestUrl, urlConn.getResponseCode(), - urlConn.getResponseMessage(), newUrl)); - urlConn.disconnect(); - if (retryAfter > 0) - Thread.sleep(retryAfter * 1000); - return attemptDownload(newUrl, retryNumber + 1, errorList); - } - if (is4xx(urlConn.getResponseCode())) { - // CLIENT ERROR, DO NOT RETRY - errorList - .add( - String - .format( - "%s error %s: %s", requestUrl, urlConn.getResponseCode(), - urlConn.getResponseMessage())); - throw new CollectorException("4xx error: request will not be repeated. " + errorList); - } - if (is5xx(urlConn.getResponseCode())) { - // SERVER SIDE ERRORS RETRY ONLY on 503 - switch (urlConn.getResponseCode()) { - case HttpURLConnection.HTTP_UNAVAILABLE: - if (retryAfter > 0) { - log - .warn( - requestUrl + " - waiting and repeating request after suggested retry-after " - + retryAfter + " sec."); - Thread.sleep(retryAfter * 1000); - } else { - log - .warn( - requestUrl + " - waiting and repeating request after default delay of " - + defaultDelay + " sec."); - Thread.sleep(defaultDelay * 1000); - } - errorList.add(requestUrl + " 503 Service Unavailable"); - urlConn.disconnect(); - return attemptDownload(requestUrl, retryNumber + 1, errorList); - default: - errorList - .add( - String - .format( - "%s Error %s: %s", requestUrl, urlConn.getResponseCode(), - urlConn.getResponseMessage())); - throw new CollectorException(urlConn.getResponseCode() + " error " + errorList); - } - } - throw new CollectorException( - String.format("Unexpected status code: %s error %s", urlConn.getResponseCode(), errorList)); - } catch (MalformedURLException | NoRouteToHostException e) { - errorList.add(String.format("Error: %s for request url: %s", e.getCause(), requestUrl)); - throw new CollectorException(e + "error " + errorList); - } catch (IOException e) { - Thread.sleep(defaultDelay * 1000); - errorList.add(requestUrl + " " + e.getMessage()); - return attemptDownload(requestUrl, retryNumber + 1, errorList); + if (log.isDebugEnabled()) { + logHeaderFields(urlConn); } - } catch (InterruptedException e) { - throw new CollectorException(e); + + int retryAfter = obtainRetryAfter(urlConn.getHeaderFields()); + if (is2xx(urlConn.getResponseCode())) { + input = urlConn.getInputStream(); + responseType = urlConn.getContentType(); + return input; + } + if (is3xx(urlConn.getResponseCode())) { + // REDIRECTS + final String newUrl = obtainNewLocation(urlConn.getHeaderFields()); + log.info(String.format("The requested url has been moved to %s", newUrl)); + report + .put( + REPORT_PREFIX + urlConn.getResponseCode(), + String.format("Moved to: %s", newUrl)); + urlConn.disconnect(); + if (retryAfter > 0) { + backoffAndSleep(retryAfter); + } + return attemptDownload(newUrl, retryNumber + 1, report); + } + if (is4xx(urlConn.getResponseCode())) { + // CLIENT ERROR, DO NOT RETRY + report + .put( + REPORT_PREFIX + urlConn.getResponseCode(), + String + .format( + "%s error: %s", requestUrl, urlConn.getResponseMessage())); + throw new CollectorException("4xx error: request will not be repeated. " + report); + } + if (is5xx(urlConn.getResponseCode())) { + // SERVER SIDE ERRORS RETRY ONLY on 503 + switch (urlConn.getResponseCode()) { + case HttpURLConnection.HTTP_UNAVAILABLE: + if (retryAfter > 0) { + log + .warn( + requestUrl + " - waiting and repeating request after suggested retry-after " + + retryAfter + " sec."); + backoffAndSleep(retryAfter * 1000); + } else { + log + .warn( + requestUrl + " - waiting and repeating request after default delay of " + + getClientParams().getRetryDelay() + " sec."); + backoffAndSleep(retryNumber * getClientParams().getRetryDelay() * 1000); + } + report.put(REPORT_PREFIX + urlConn.getResponseCode(), requestUrl); + urlConn.disconnect(); + return attemptDownload(requestUrl, retryNumber + 1, report); + default: + report + .put( + REPORT_PREFIX + urlConn.getResponseCode(), + String + .format( + "%s Error: %s", requestUrl, urlConn.getResponseMessage())); + throw new CollectorException(urlConn.getResponseCode() + " error " + report); + } + } + throw new CollectorException( + String.format("Unexpected status code: %s error %s", urlConn.getResponseCode(), report)); + } catch (MalformedURLException | SocketException | UnknownHostException e) { + log.error(e.getMessage(), e); + report.put(e.getClass().getName(), e.getMessage()); + throw new CollectorException(e.getMessage(), e); } } @@ -200,12 +182,21 @@ public class HttpConnector2 { } } + private void backoffAndSleep(int sleepTime) throws CollectorException { + log.info("I'm going to sleep for {}ms", sleepTime); + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + throw new CollectorException(e); + } + } + private int obtainRetryAfter(final Map> headerMap) { for (String key : headerMap.keySet()) { - if ((key != null) && key.toLowerCase().equals("retry-after") && (headerMap.get(key).size() > 0) + if ((key != null) && key.equalsIgnoreCase(HttpHeaders.RETRY_AFTER) && (headerMap.get(key).size() > 0) && NumberUtils.isCreatable(headerMap.get(key).get(0))) { - return Integer - .parseInt(headerMap.get(key).get(0)) + 10; + return Integer.parseInt(headerMap.get(key).get(0)) + 10; } } return -1; @@ -213,44 +204,13 @@ public class HttpConnector2 { private String obtainNewLocation(final Map> headerMap) throws CollectorException { for (String key : headerMap.keySet()) { - if ((key != null) && key.toLowerCase().equals("location") && (headerMap.get(key).size() > 0)) { + if ((key != null) && key.equalsIgnoreCase(HttpHeaders.LOCATION) && (headerMap.get(key).size() > 0)) { return headerMap.get(key).get(0); } } throw new CollectorException("The requested url has been MOVED, but 'location' param is MISSING"); } - /** - * register for https scheme; this is a workaround and not intended for the use in trusted environments - */ - public void initTrustManager() { - final X509TrustManager tm = new X509TrustManager() { - - @Override - public void checkClientTrusted(final X509Certificate[] xcs, final String string) { - } - - @Override - public void checkServerTrusted(final X509Certificate[] xcs, final String string) { - } - - @Override - public X509Certificate[] getAcceptedIssuers() { - return null; - } - }; - try { - final SSLContext ctx = SSLContext.getInstance("TLS"); - ctx.init(null, new TrustManager[] { - tm - }, null); - HttpsURLConnection.setDefaultSSLSocketFactory(ctx.getSocketFactory()); - } catch (GeneralSecurityException e) { - log.fatal(e); - throw new IllegalStateException(e); - } - } - private boolean is2xx(final int statusCode) { return statusCode >= 200 && statusCode <= 299; } @@ -267,32 +227,15 @@ public class HttpConnector2 { return statusCode >= 500 && statusCode <= 599; } - public int getMaxNumberOfRetry() { - return maxNumberOfRetry; - } - - public void setMaxNumberOfRetry(final int maxNumberOfRetry) { - this.maxNumberOfRetry = maxNumberOfRetry; - } - - public int getDefaultDelay() { - return defaultDelay; - } - - public void setDefaultDelay(final int defaultDelay) { - this.defaultDelay = defaultDelay; - } - - public int getReadTimeOut() { - return readTimeOut; - } - - public void setReadTimeOut(final int readTimeOut) { - this.readTimeOut = readTimeOut; - } - public String getResponseType() { return responseType; } + public HttpClientParams getClientParams() { + return clientParams; + } + + public void setClientParams(HttpClientParams clientParams) { + this.clientParams = clientParams; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/UnknownCollectorPluginException.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/UnknownCollectorPluginException.java new file mode 100644 index 000000000..c55d485e2 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/UnknownCollectorPluginException.java @@ -0,0 +1,32 @@ + +package eu.dnetlib.dhp.collection.worker.utils; + +public class UnknownCollectorPluginException extends Exception { + + /** */ + private static final long serialVersionUID = -290723075076039757L; + + public UnknownCollectorPluginException() { + super(); + } + + public UnknownCollectorPluginException( + final String message, + final Throwable cause, + final boolean enableSuppression, + final boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + + public UnknownCollectorPluginException(final String message, final Throwable cause) { + super(message, cause); + } + + public UnknownCollectorPluginException(final String message) { + super(message); + } + + public UnknownCollectorPluginException(final Throwable cause) { + super(cause); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java index e1b1b849c..b735ecb1f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java @@ -105,7 +105,8 @@ public class TransformSparkJobNode { log.info("Total item " + ct.getTotalItems().count()); log.info("Transformation Error item " + ct.getErrorItems().count()); - writeTotalSizeOnHDFS(spark, mdstore.count(), outputBasePath + MDSTORE_SIZE_PATH); + writeHdfsFile( + spark.sparkContext().hadoopConfiguration(), "" + mdstore.count(), outputBasePath + MDSTORE_SIZE_PATH); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_reporter_input_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_reporter_input_parameter.json new file mode 100644 index 000000000..ef65cc389 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_reporter_input_parameter.json @@ -0,0 +1,14 @@ +[ + { + "paramName": "n", + "paramLongName": "namenode", + "paramDescription": "the Name Node URI", + "paramRequired": true + }, + { + "paramName": "mv", + "paramLongName": "mdStoreVersion", + "paramDescription": "the MDStore Version bean", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_worker_input_parameter.json similarity index 52% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_parameter.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_worker_input_parameter.json index 6ccba468a..f3eaf2d71 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_parameter.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_worker_input_parameter.json @@ -23,12 +23,34 @@ "paramDescription": "the End point URL to send Messages", "paramRequired": true }, - - { "paramName": "w", "paramLongName": "workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true + }, + { + "paramName": "mr", + "paramLongName": "maxNumberOfRetry", + "paramDescription": "the maximum number of admitted connection retries", + "paramRequired": false + }, + { + "paramName": "rd", + "paramLongName": "retryDelay", + "paramDescription": "the delay (ms) between retries", + "paramRequired": false + }, + { + "paramName": "ct", + "paramLongName": "connectTimeOut", + "paramDescription": "the maximum allowed time (ms) to connect to the remote host", + "paramRequired": false + }, + { + "paramName": "rt", + "paramLongName": "readTimeOut", + "paramDescription": "the maximum allowed time (ms) to receive content from the remote host", + "paramRequired": false } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/generate_native_input_parameters.json similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/generate_native_input_parameters.json diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml index b74ef6b61..e7f6b9201 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml @@ -94,9 +94,22 @@ --workflowId${workflowId} --dnetMessageManagerURL${dnetMessageManagerURL} --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} - + --maxNumberOfRetry${maxNumberOfRetry} + --retryDelay${retryDelay} + --connectTimeOut${connectTimeOut} + --readTimeOut${readTimeOut} + + + + + + + eu.dnetlib.dhp.collection.worker.CollectorWorkerReporter + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --namenode${nameNode} + diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java deleted file mode 100644 index 103e11c33..000000000 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java +++ /dev/null @@ -1,44 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.project.httpconnector; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.http.conn.ssl.SSLConnectionSocketFactory; -import org.apache.http.ssl.SSLContextBuilder; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -import eu.dnetlib.dhp.collection.worker.CollectorException; -import eu.dnetlib.dhp.collection.worker.utils.HttpConnector2; - -@Disabled -public class HttpConnectorTest { - - private static final Log log = LogFactory.getLog(HttpConnectorTest.class); - private static HttpConnector2 connector; - - private static final String URL = "http://cordis.europa.eu/data/reference/cordisref-H2020topics.xlsx"; - private static final String URL_MISCONFIGURED_SERVER = "https://www.alexandria.unisg.ch/cgi/oai2?verb=Identify"; - private static final String URL_GOODSNI_SERVER = "https://air.unimi.it/oai/openaire?verb=Identify"; - - private static final SSLContextBuilder sslContextBuilder = new SSLContextBuilder(); - private static SSLConnectionSocketFactory sslSocketFactory; - - @BeforeAll - public static void setUp() { - connector = new HttpConnector2(); - } - - @Test - - public void testGetInputSource() throws CollectorException { - System.out.println(connector.getInputSource(URL)); - } - - @Test - public void testGoodServers() throws CollectorException { - System.out.println(connector.getInputSource(URL_GOODSNI_SERVER)); - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/CollectorWorkerApplicationTests.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/CollectorWorkerApplicationTests.java index 84cad8e19..65c2833eb 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/CollectorWorkerApplicationTests.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/CollectorWorkerApplicationTests.java @@ -13,6 +13,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.collection.worker.CollectorWorker; import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory; +import eu.dnetlib.dhp.collection.worker.utils.HttpClientParams; import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; @Disabled @@ -21,8 +22,9 @@ public class CollectorWorkerApplicationTests { @Test public void testFindPlugin() throws Exception { final CollectorPluginFactory collectorPluginEnumerator = new CollectorPluginFactory(); - assertNotNull(collectorPluginEnumerator.getPluginByProtocol("oai")); - assertNotNull(collectorPluginEnumerator.getPluginByProtocol("OAI")); + final HttpClientParams clientParams = new HttpClientParams(); + assertNotNull(collectorPluginEnumerator.getPluginByProtocol(clientParams, "oai")); + assertNotNull(collectorPluginEnumerator.getPluginByProtocol(clientParams, "OAI")); } @Test diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/utils/CollectorPluginReportTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/utils/CollectorPluginReportTest.java new file mode 100644 index 000000000..69376d5eb --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/utils/CollectorPluginReportTest.java @@ -0,0 +1,29 @@ + +package eu.dnetlib.dhp.collector.worker.utils; + +import java.io.IOException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import eu.dnetlib.dhp.aggregation.common.AggregationUtility; +import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport; + +public class CollectorPluginReportTest { + + @Test + public void testSerialize() throws IOException { + CollectorPluginReport r1 = new CollectorPluginReport(); + r1.put("a", "b"); + r1.setSuccess(true); + + String s = AggregationUtility.MAPPER.writeValueAsString(r1); + + Assertions.assertNotNull(s); + + CollectorPluginReport r2 = AggregationUtility.MAPPER.readValue(s, CollectorPluginReport.class); + + Assertions.assertTrue(r2.isSuccess(), "should be true"); + } + +}