diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/application/ApplicationUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ApplicationUtils.java index c78fb1b1f..c53b83561 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/application/ApplicationUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ApplicationUtils.java @@ -11,34 +11,4 @@ import com.google.common.collect.Maps; public class ApplicationUtils { - public static Configuration getHadoopConfiguration(String nameNode) { - // ====== Init HDFS File System Object - Configuration conf = new Configuration(); - // Set FileSystem URI - conf.set("fs.defaultFS", nameNode); - // Because of Maven - conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); - conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); - - System.setProperty("hadoop.home.dir", "/"); - return conf; - } - - public static void populateOOZIEEnv(final Map report) throws IOException { - File file = new File(System.getProperty("oozie.action.output.properties")); - Properties props = new Properties(); - report.forEach((k, v) -> props.setProperty(k, v)); - - try(OutputStream os = new FileOutputStream(file)) { - props.store(os, ""); - } - } - - public static void populateOOZIEEnv(final String paramName, String value) throws IOException { - Map report = Maps.newHashMap(); - report.put(paramName, value); - - populateOOZIEEnv(report); - } - } diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java index 8872174a5..8d760a2cd 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java @@ -1,18 +1,29 @@ package eu.dnetlib.dhp.utils; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; +import java.io.*; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64OutputStream; import org.apache.commons.codec.binary.Hex; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SaveMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; import com.jayway.jsonpath.JsonPath; import net.minidev.json.JSONArray; @@ -21,6 +32,8 @@ import scala.collection.Seq; public class DHPUtils { + private static final Logger log = LoggerFactory.getLogger(DHPUtils.class); + public static Seq toSeq(List list) { return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq(); } @@ -79,4 +92,72 @@ public class DHPUtils { return ""; } } + + public static final ObjectMapper MAPPER = new ObjectMapper(); + + public static void writeHdfsFile(final Configuration conf, final String content, final String path) + throws IOException { + + log.info("writing file {}, size {}", path, content.length()); + try (FileSystem fs = FileSystem.get(conf); + BufferedOutputStream os = new BufferedOutputStream(fs.create(new Path(path)))) { + os.write(content.getBytes(StandardCharsets.UTF_8)); + os.flush(); + } + } + + public static String readHdfsFile(Configuration conf, String path) throws IOException { + log.info("reading file {}", path); + + try (FileSystem fs = FileSystem.get(conf)) { + final Path p = new Path(path); + if (!fs.exists(p)) { + throw new FileNotFoundException(path); + } + return IOUtils.toString(fs.open(p)); + } + } + + public static T readHdfsFileAs(Configuration conf, String path, Class clazz) throws IOException { + return MAPPER.readValue(readHdfsFile(conf, path), clazz); + } + + public static void saveDataset(final Dataset mdstore, final String targetPath) { + log.info("saving dataset in: {}", targetPath); + mdstore + .write() + .mode(SaveMode.Overwrite) + .format("parquet") + .save(targetPath); + } + + public static Configuration getHadoopConfiguration(String nameNode) { + // ====== Init HDFS File System Object + Configuration conf = new Configuration(); + // Set FileSystem URI + conf.set("fs.defaultFS", nameNode); + // Because of Maven + conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + + System.setProperty("hadoop.home.dir", "/"); + return conf; + } + + public static void populateOOZIEEnv(final Map report) throws IOException { + File file = new File(System.getProperty("oozie.action.output.properties")); + Properties props = new Properties(); + report.forEach((k, v) -> props.setProperty(k, v)); + + try (OutputStream os = new FileOutputStream(file)) { + props.store(os, ""); + } + } + + public static void populateOOZIEEnv(final String paramName, String value) throws IOException { + Map report = Maps.newHashMap(); + report.put(paramName, value); + + populateOOZIEEnv(report); + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java index 1b9e070fe..3f64eb953 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java @@ -18,7 +18,7 @@ import org.apache.hadoop.fs.Path; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.collection.worker.utils.HttpConnector2; +import eu.dnetlib.dhp.collection.worker.HttpConnector2; /** * Applies the parsing of a csv file and writes the Serialization of it in hdfs diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java index 2ad3f5b34..c661909b0 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java @@ -15,7 +15,7 @@ import org.apache.hadoop.fs.Path; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.collection.worker.utils.HttpConnector2; +import eu.dnetlib.dhp.collection.worker.HttpConnector2; /** * Applies the parsing of an excel file and writes the Serialization of it in hdfs diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java deleted file mode 100644 index 8dad5bb81..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java +++ /dev/null @@ -1,69 +0,0 @@ - -package eu.dnetlib.dhp.aggregation.common; - -import java.io.BufferedOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob; -import eu.dnetlib.dhp.model.mdstore.MetadataRecord; - -public class AggregationUtility { - - private static final Logger log = LoggerFactory.getLogger(AggregationUtility.class); - - public static final ObjectMapper MAPPER = new ObjectMapper(); - - public static void writeHdfsFile(final Configuration conf, final String content, final String path) - throws IOException { - - log.info("writing file {}, size {}", path, content.length()); - try (FileSystem fs = FileSystem.get(conf); - BufferedOutputStream os = new BufferedOutputStream(fs.create(new Path(path)))) { - os.write(content.getBytes(StandardCharsets.UTF_8)); - os.flush(); - } - } - - public static String readHdfsFile(Configuration conf, String path) throws IOException { - log.info("reading file {}", path); - - try (FileSystem fs = FileSystem.get(conf)) { - final Path p = new Path(path); - if (!fs.exists(p)) { - throw new FileNotFoundException(path); - } - return IOUtils.toString(fs.open(p)); - } - } - - public static T readHdfsFileAs(Configuration conf, String path, Class clazz) throws IOException { - return MAPPER.readValue(readHdfsFile(conf, path), clazz); - } - - public static void saveDataset(final Dataset mdstore, final String targetPath) { - log.info("saving dataset in: {}", targetPath); - mdstore - .write() - .mode(SaveMode.Overwrite) - .format("parquet") - .save(targetPath); - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/mdstore/MDStoreActionNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/mdstore/MDStoreActionNode.java index 3e471cfc8..9a47a1d66 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/mdstore/MDStoreActionNode.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/mdstore/MDStoreActionNode.java @@ -1,18 +1,14 @@ package eu.dnetlib.dhp.aggregation.mdstore; -import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; +import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*; import static eu.dnetlib.dhp.application.ApplicationUtils.*; +import static eu.dnetlib.dhp.utils.DHPUtils.*; -import java.io.File; -import java.io.FileOutputStream; -import java.io.OutputStream; import java.net.URI; -import java.util.Properties; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -80,29 +76,20 @@ public class MDStoreActionNode { throw new IllegalArgumentException( "invalid MDStoreVersion value current is " + mdStoreVersion_params); } + Path hdfstoreSizepath = new Path(mdStoreVersion.getHdfsPath() + MDSTORE_SIZE_PATH); - Configuration conf = new Configuration(); - // Set FileSystem URI - conf.set("fs.defaultFS", hdfsuri); - // Because of Maven - conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); - conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + try ( + FileSystem fs = FileSystem.get(URI.create(hdfsuri), getHadoopConfiguration(hdfsuri)); + FSDataInputStream inputStream = fs.open(hdfstoreSizepath)) { - System.setProperty("hadoop.home.dir", "/"); - // Get the filesystem - HDFS - FileSystem fs = FileSystem.get(URI.create(hdfsuri), conf); + final Long mdStoreSize = Long.parseLong(IOUtils.toString(inputStream)); - Path hdfstoreSizepath = new Path(mdStoreVersion.getHdfsPath() + "/size"); + fs.create(hdfstoreSizepath); + DNetRestClient + .doGET( + String.format(COMMIT_VERSION_URL, mdStoreManagerURI, mdStoreVersion.getId(), mdStoreSize)); + } - FSDataInputStream inputStream = fs.open(hdfstoreSizepath); - - final Long mdStoreSize = Long.parseLong(IOUtils.toString(inputStream)); - - inputStream.close(); - fs.create(hdfstoreSizepath); - - DNetRestClient - .doGET(String.format(COMMIT_VERSION_URL, mdStoreManagerURI, mdStoreVersion.getId(), mdStoreSize)); break; } case ROLLBACK: { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java index 5839df2d0..5c24bb7ec 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java @@ -2,8 +2,8 @@ package eu.dnetlib.dhp.collection; import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*; -import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.utils.DHPUtils.*; import java.io.ByteArrayInputStream; import java.io.IOException; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java index d0905aade..614aa4e69 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java @@ -4,7 +4,7 @@ package eu.dnetlib.dhp.collection.plugin; import java.util.stream.Stream; import eu.dnetlib.dhp.collection.worker.CollectorException; -import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport; +import eu.dnetlib.dhp.collection.worker.CollectorPluginReport; import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; public interface CollectorPlugin { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java index 29e12f312..7ec2f09be 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java @@ -15,8 +15,8 @@ import com.google.common.collect.Lists; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; import eu.dnetlib.dhp.collection.worker.CollectorException; -import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport; -import eu.dnetlib.dhp.collection.worker.utils.HttpClientParams; +import eu.dnetlib.dhp.collection.worker.CollectorPluginReport; +import eu.dnetlib.dhp.collection.worker.HttpClientParams; import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; public class OaiCollectorPlugin implements CollectorPlugin { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java index 667e7a3d3..8d913b68f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java @@ -17,9 +17,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.collection.worker.CollectorException; -import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport; -import eu.dnetlib.dhp.collection.worker.utils.HttpConnector2; -import eu.dnetlib.dhp.collection.worker.utils.XmlCleaner; +import eu.dnetlib.dhp.collection.worker.CollectorPluginReport; +import eu.dnetlib.dhp.collection.worker.HttpConnector2; +import eu.dnetlib.dhp.collection.worker.XmlCleaner; public class OaiIterator implements Iterator { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java index c751a94e7..f63fa37a1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java @@ -3,9 +3,9 @@ package eu.dnetlib.dhp.collection.plugin.oai; import java.util.Iterator; -import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport; -import eu.dnetlib.dhp.collection.worker.utils.HttpClientParams; -import eu.dnetlib.dhp.collection.worker.utils.HttpConnector2; +import eu.dnetlib.dhp.collection.worker.CollectorPluginReport; +import eu.dnetlib.dhp.collection.worker.HttpClientParams; +import eu.dnetlib.dhp.collection.worker.HttpConnector2; public class OaiIteratorFactory { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorPluginFactory.java similarity index 84% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginFactory.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorPluginFactory.java index ab7dad077..9668098f0 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginFactory.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorPluginFactory.java @@ -1,9 +1,8 @@ -package eu.dnetlib.dhp.collection.worker.utils; +package eu.dnetlib.dhp.collection.worker; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin; -import eu.dnetlib.dhp.collection.worker.CollectorException; public class CollectorPluginFactory { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginReport.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorPluginReport.java similarity index 86% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginReport.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorPluginReport.java index b7bf539dc..2da6ac8f9 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginReport.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorPluginReport.java @@ -1,7 +1,7 @@ -package eu.dnetlib.dhp.collection.worker.utils; +package eu.dnetlib.dhp.collection.worker; -import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.MAPPER; +import static eu.dnetlib.dhp.utils.DHPUtils.*; import java.io.Closeable; import java.io.IOException; @@ -45,7 +45,7 @@ public class CollectorPluginReport extends LinkedHashMap impleme } public Boolean isSuccess() { - return Boolean.valueOf(get(SUCCESS)); + return containsKey(SUCCESS) && Boolean.valueOf(get(SUCCESS)); } public void setSuccess(Boolean success) { @@ -58,7 +58,7 @@ public class CollectorPluginReport extends LinkedHashMap impleme if (Objects.nonNull(fos)) { log.info("writing report {} to {}", data, path.toString()); IOUtils.write(data, fos); - ApplicationUtils.populateOOZIEEnv(this); + populateOOZIEEnv(this); } } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java index b0efd088c..71dee0d03 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java @@ -2,7 +2,6 @@ package eu.dnetlib.dhp.collection.worker; import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.SEQUENCE_FILE_NAME; -import static eu.dnetlib.dhp.application.ApplicationUtils.*; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; @@ -17,10 +16,6 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; -import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory; -import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport; -import eu.dnetlib.dhp.collection.worker.utils.HttpClientParams; -import eu.dnetlib.dhp.collection.worker.utils.UnknownCollectorPluginException; import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; import eu.dnetlib.dhp.message.MessageSender; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java index 8f26074c3..a6c254d42 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java @@ -2,29 +2,21 @@ package eu.dnetlib.dhp.collection.worker; import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*; -import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; -import static eu.dnetlib.dhp.application.ApplicationUtils.*; +import static eu.dnetlib.dhp.utils.DHPUtils.*; import java.io.IOException; import java.util.Optional; import org.apache.commons.cli.ParseException; -import org.apache.commons.io.FileSystemUtils; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; -import eu.dnetlib.dhp.aggregation.common.AggregationUtility; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport; -import eu.dnetlib.dhp.collection.worker.utils.HttpClientParams; -import eu.dnetlib.dhp.collection.worker.utils.UnknownCollectorPluginException; import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; import eu.dnetlib.dhp.message.MessageSender; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerReporter.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerReporter.java index e0e402cfb..3a8145946 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerReporter.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerReporter.java @@ -2,10 +2,7 @@ package eu.dnetlib.dhp.collection.worker; import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.REPORT_FILE_NAME; -import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; -import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.MAPPER; -import static eu.dnetlib.dhp.application.ApplicationUtils.getHadoopConfiguration; -import static eu.dnetlib.dhp.application.ApplicationUtils.populateOOZIEEnv; +import static eu.dnetlib.dhp.utils.DHPUtils.*; import java.io.IOException; import java.util.Objects; @@ -13,15 +10,11 @@ import java.util.Objects; import org.apache.commons.cli.ParseException; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; -import eu.dnetlib.dhp.aggregation.common.AggregationUtility; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport; -import eu.dnetlib.dhp.collection.worker.utils.UnknownCollectorPluginException; /** * CollectorWorkerReporter diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpClientParams.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/HttpClientParams.java similarity index 96% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpClientParams.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/HttpClientParams.java index e77f3680f..315dd27c2 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpClientParams.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/HttpClientParams.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.collection.worker.utils; +package eu.dnetlib.dhp.collection.worker; /** * Bundles the http connection parameters driving the client behaviour. diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector2.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/HttpConnector2.java similarity index 93% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector2.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/HttpConnector2.java index 68b1ef8ad..ee3acf432 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector2.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/HttpConnector2.java @@ -1,5 +1,7 @@ -package eu.dnetlib.dhp.collection.worker.utils; +package eu.dnetlib.dhp.collection.worker; + +import static eu.dnetlib.dhp.utils.DHPUtils.*; import java.io.IOException; import java.io.InputStream; @@ -13,8 +15,6 @@ import org.apache.http.HttpHeaders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import eu.dnetlib.dhp.collection.worker.CollectorException; - /** * Migrated from https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-modular-collector-service/trunk/src/main/java/eu/dnetlib/data/collector/plugins/HttpConnector.java * @@ -162,11 +162,19 @@ public class HttpConnector2 { } } throw new CollectorException( - String.format("Unexpected status code: %s error %s", urlConn.getResponseCode(), report)); + String + .format( + "Unexpected status code: %s errors: %s", urlConn.getResponseCode(), + MAPPER.writeValueAsString(report))); } catch (MalformedURLException | SocketException | UnknownHostException e) { log.error(e.getMessage(), e); report.put(e.getClass().getName(), e.getMessage()); throw new CollectorException(e.getMessage(), e); + } catch (SocketTimeoutException e) { + log.error(e.getMessage(), e); + report.put(e.getClass().getName(), e.getMessage()); + backoffAndSleep(getClientParams().getRetryDelay() * retryNumber * 1000); + return attemptDownload(requestUrl, retryNumber + 1, report); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/UnknownCollectorPluginException.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/UnknownCollectorPluginException.java similarity index 93% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/UnknownCollectorPluginException.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/UnknownCollectorPluginException.java index c55d485e2..7134dd069 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/UnknownCollectorPluginException.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/UnknownCollectorPluginException.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.collection.worker.utils; +package eu.dnetlib.dhp.collection.worker; public class UnknownCollectorPluginException extends Exception { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/XmlCleaner.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/XmlCleaner.java similarity index 99% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/XmlCleaner.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/XmlCleaner.java index 44aeb4d02..41ba02196 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/XmlCleaner.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/XmlCleaner.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.collection.worker.utils; +package eu.dnetlib.dhp.collection.worker; import java.util.HashMap; import java.util.HashSet; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java index b735ecb1f..c0a03e081 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java @@ -2,8 +2,8 @@ package eu.dnetlib.dhp.transformation; import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*; -import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.utils.DHPUtils.*; import java.io.IOException; import java.util.Map; diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java index e6fda9be0..7f597f950 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java @@ -14,7 +14,7 @@ import org.junit.jupiter.api.Test; import eu.dnetlib.dhp.actionmanager.project.utils.EXCELParser; import eu.dnetlib.dhp.collection.worker.CollectorException; -import eu.dnetlib.dhp.collection.worker.utils.HttpConnector2; +import eu.dnetlib.dhp.collection.worker.HttpConnector2; @Disabled public class EXCELParserTest { diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/CollectorWorkerApplicationTests.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/CollectorWorkerApplicationTests.java index 65c2833eb..80bafd6d8 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/CollectorWorkerApplicationTests.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/CollectorWorkerApplicationTests.java @@ -3,17 +3,13 @@ package eu.dnetlib.dhp.collector.worker; import static org.junit.jupiter.api.Assertions.assertNotNull; -import java.nio.file.Path; - import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.collection.worker.CollectorWorker; -import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory; -import eu.dnetlib.dhp.collection.worker.utils.HttpClientParams; +import eu.dnetlib.dhp.collection.worker.CollectorPluginFactory; +import eu.dnetlib.dhp.collection.worker.HttpClientParams; import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; @Disabled diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/utils/CollectorPluginReportTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/utils/CollectorPluginReportTest.java index 69376d5eb..d665e5b5f 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/utils/CollectorPluginReportTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/utils/CollectorPluginReportTest.java @@ -1,13 +1,14 @@ package eu.dnetlib.dhp.collector.worker.utils; +import static eu.dnetlib.dhp.utils.DHPUtils.*; + import java.io.IOException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import eu.dnetlib.dhp.aggregation.common.AggregationUtility; -import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport; +import eu.dnetlib.dhp.collection.worker.CollectorPluginReport; public class CollectorPluginReportTest { @@ -17,11 +18,11 @@ public class CollectorPluginReportTest { r1.put("a", "b"); r1.setSuccess(true); - String s = AggregationUtility.MAPPER.writeValueAsString(r1); + String s = MAPPER.writeValueAsString(r1); Assertions.assertNotNull(s); - CollectorPluginReport r2 = AggregationUtility.MAPPER.readValue(s, CollectorPluginReport.class); + CollectorPluginReport r2 = MAPPER.readValue(s, CollectorPluginReport.class); Assertions.assertTrue(r2.isSuccess(), "should be true"); }