From 29c6f7e255cd6b23d254833789827f8b6c869a73 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 12 Feb 2021 12:31:02 +0100 Subject: [PATCH] classes related to the collection workflow moved into common package; implemented MongoDB collection plugins --- .../eu/dnetlib/dhp/message/MessageSender.java | 8 +- dhp-workflows/dhp-aggregation/pom.xml | 5 +- .../actionmanager/project/utils/ReadCSV.java | 2 +- .../project/utils/ReadExcel.java | 2 +- .../{worker => }/CollectorException.java | 2 +- .../{worker => }/CollectorPluginReport.java | 9 +- .../{worker => }/CollectorWorker.java | 46 +++++-- .../CollectorWorkerApplication.java | 32 +++-- .../{worker => }/CollectorWorkerReporter.java | 2 +- .../{worker => }/HttpClientParams.java | 2 +- .../{worker => }/HttpConnector2.java | 2 +- .../UnknownCollectorPluginException.java | 2 +- .../collection/{worker => }/XmlCleaner.java | 2 +- .../collection/plugin/CollectorPlugin.java | 4 +- .../mongodb/MongoDbCollectorPlugin.java | 59 ++++++++ .../mongodb/MongoDbDumpCollectorPlugin.java | 54 ++++++++ .../plugin/oai/OaiCollectorPlugin.java | 6 +- .../collection/plugin/oai/OaiIterator.java | 8 +- .../plugin/oai/OaiIteratorFactory.java | 6 +- .../worker/CollectorPluginFactory.java | 20 --- .../dhp/collection/oozie_app/workflow.xml | 4 +- .../project/EXCELParserTest.java | 4 +- .../dhp/collection/CollectionJobTest.java | 130 ------------------ .../collection/CollectionWorkflowTest.java | 113 +++++++++++++++ .../GenerateNativeStoreSparkJobTest.java} | 84 +++++++++-- .../CollectorWorkerApplicationTests.java | 10 -- .../utils/CollectorPluginReportTest.java | 2 +- .../transformation/TransformationJobTest.java | 4 +- .../dnetlib/dhp/collection/apiDescriptor.json | 10 ++ .../eu/dnetlib/dhp/oa/provision/fields.xml | 2 + .../eu/dnetlib/dhp/oa/provision/record.xml | 4 +- 31 files changed, 411 insertions(+), 229 deletions(-) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/{worker => }/CollectorException.java (93%) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/{worker => }/CollectorPluginReport.java (90%) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/{worker => }/CollectorWorker.java (64%) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/{worker => }/CollectorWorkerApplication.java (86%) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/{worker => }/CollectorWorkerReporter.java (97%) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/{worker => }/HttpClientParams.java (97%) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/{worker => }/HttpConnector2.java (99%) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/{worker => }/UnknownCollectorPluginException.java (94%) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/{worker => }/XmlCleaner.java (99%) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbCollectorPlugin.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbDumpCollectorPlugin.java delete mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorPluginFactory.java delete mode 100644 dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java create mode 100644 dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionWorkflowTest.java rename dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/{aggregation/AggregationJobTest.java => collection/GenerateNativeStoreSparkJobTest.java} (73%) create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/apiDescriptor.json diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java b/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java index 3f9d07a7eb..16bb0c97ea 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java @@ -1,6 +1,9 @@ package eu.dnetlib.dhp.message; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPut; @@ -30,13 +33,15 @@ public class MessageSender { private final String workflowId; + private ExecutorService executorService = Executors.newCachedThreadPool(); + public MessageSender(final String dnetMessageEndpoint, final String workflowId) { this.workflowId = workflowId; this.dnetMessageEndpoint = dnetMessageEndpoint; } public void sendMessage(final Message message) { - new Thread(() -> _sendMessage(message)).start(); + executorService.submit(() -> _sendMessage(message)); } public void sendMessage(final Long current, final Long total) { @@ -67,7 +72,6 @@ public class MessageSender { .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS) .setSocketTimeout(SOCKET_TIMEOUT_MS) .build(); - ; try (final CloseableHttpClient client = HttpClients .custom() diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml index f0ee425429..6887be55e4 100644 --- a/dhp-workflows/dhp-aggregation/pom.xml +++ b/dhp-workflows/dhp-aggregation/pom.xml @@ -106,7 +106,10 @@ commons-compress - + + org.mongodb + mongo-java-driver + diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java index 3f64eb9537..cad6b94e13 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java @@ -18,7 +18,7 @@ import org.apache.hadoop.fs.Path; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.collection.worker.HttpConnector2; +import eu.dnetlib.dhp.collection.HttpConnector2; /** * Applies the parsing of a csv file and writes the Serialization of it in hdfs diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java index c661909b02..fc3b38ac58 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java @@ -15,7 +15,7 @@ import org.apache.hadoop.fs.Path; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.collection.worker.HttpConnector2; +import eu.dnetlib.dhp.collection.HttpConnector2; /** * Applies the parsing of an excel file and writes the Serialization of it in hdfs diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorException.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorException.java similarity index 93% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorException.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorException.java index 71d225f131..144d297e62 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorException.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorException.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.collection.worker; +package eu.dnetlib.dhp.collection; public class CollectorException extends Exception { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorPluginReport.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorPluginReport.java similarity index 90% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorPluginReport.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorPluginReport.java index 2da6ac8f99..a7204523ab 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorPluginReport.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorPluginReport.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.collection.worker; +package eu.dnetlib.dhp.collection; import static eu.dnetlib.dhp.utils.DHPUtils.*; @@ -17,15 +17,10 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.annotation.JsonIgnore; -import eu.dnetlib.dhp.application.ApplicationUtils; - public class CollectorPluginReport extends LinkedHashMap implements Closeable { private static final Logger log = LoggerFactory.getLogger(CollectorPluginReport.class); - @JsonIgnore - private FileSystem fs; - @JsonIgnore private Path path; @@ -38,9 +33,7 @@ public class CollectorPluginReport extends LinkedHashMap impleme } public CollectorPluginReport(FileSystem fs, Path path) throws IOException { - this.fs = fs; this.path = path; - this.fos = fs.create(path); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java similarity index 64% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java index 945eff8b07..ace725bfd7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java @@ -1,23 +1,27 @@ -package eu.dnetlib.dhp.collection.worker; +package eu.dnetlib.dhp.collection; import static eu.dnetlib.dhp.common.Constants.SEQUENCE_FILE_NAME; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.conf.Configuration; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.DeflateCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; -import eu.dnetlib.dhp.collection.ApiDescriptor; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; +import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbCollectorPlugin; +import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin; +import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin; import eu.dnetlib.dhp.message.MessageSender; public class CollectorWorker { @@ -26,7 +30,7 @@ public class CollectorWorker { private final ApiDescriptor api; - private final Configuration conf; + private final FileSystem fileSystem; private final MDStoreVersion mdStoreVersion; @@ -38,13 +42,13 @@ public class CollectorWorker { public CollectorWorker( final ApiDescriptor api, - final Configuration conf, + final FileSystem fileSystem, final MDStoreVersion mdStoreVersion, final HttpClientParams clientParams, final MessageSender messageSender, final CollectorPluginReport report) { this.api = api; - this.conf = conf; + this.fileSystem = fileSystem; this.mdStoreVersion = mdStoreVersion; this.clientParams = clientParams; this.messageSender = messageSender; @@ -56,16 +60,16 @@ public class CollectorWorker { final String outputPath = mdStoreVersion.getHdfsPath() + SEQUENCE_FILE_NAME; log.info("outputPath path is {}", outputPath); - final CollectorPlugin plugin = CollectorPluginFactory.getPluginByProtocol(clientParams, api.getProtocol()); + final CollectorPlugin plugin = getCollectorPlugin(); final AtomicInteger counter = new AtomicInteger(0); try (SequenceFile.Writer writer = SequenceFile .createWriter( - conf, + fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(Text.class), - SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new GzipCodec()))) { + SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) { final IntWritable key = new IntWritable(counter.get()); final Text value = new Text(); plugin @@ -94,4 +98,26 @@ public class CollectorWorker { } } + private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException { + switch (StringUtils.lowerCase(StringUtils.trim(api.getProtocol()))) { + case "oai": + return new OaiCollectorPlugin(clientParams); + case "other": + final String plugin = Optional + .ofNullable(api.getParams().get("other_plugin_type")) + .orElseThrow(() -> new UnknownCollectorPluginException("other_plugin_type")); + + switch (plugin) { + case "mdstore_mongodb_dump": + return new MongoDbDumpCollectorPlugin(fileSystem); + case "mdstore_mongodb": + return new MongoDbCollectorPlugin(); + default: + throw new UnknownCollectorPluginException("Unknown plugin type: " + plugin); + } + default: + throw new UnknownCollectorPluginException("Unknown protocol: " + api.getProtocol()); + } + } + } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerApplication.java similarity index 86% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerApplication.java index 17f09ee5a1..0eea0837c0 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerApplication.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.collection.worker; +package eu.dnetlib.dhp.collection; import static eu.dnetlib.dhp.common.Constants.*; import static eu.dnetlib.dhp.utils.DHPUtils.*; @@ -9,7 +9,6 @@ import java.util.Optional; import org.apache.commons.cli.ParseException; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -17,7 +16,6 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.collection.ApiDescriptor; import eu.dnetlib.dhp.message.MessageSender; /** @@ -32,6 +30,12 @@ public class CollectorWorkerApplication { private static final Logger log = LoggerFactory.getLogger(CollectorWorkerApplication.class); + private FileSystem fileSystem; + + public CollectorWorkerApplication(FileSystem fileSystem) { + this.fileSystem = fileSystem; + } + /** * @param args */ @@ -63,6 +67,18 @@ public class CollectorWorkerApplication { final String workflowId = argumentParser.get("workflowId"); log.info("workflowId is {}", workflowId); + final HttpClientParams clientParams = getClientParams(argumentParser); + + final ApiDescriptor api = MAPPER.readValue(apiDescriptor, ApiDescriptor.class); + final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(hdfsuri)); + + new CollectorWorkerApplication(fileSystem) + .run(mdStoreVersion, clientParams, api, dnetMessageManagerURL, workflowId); + } + + protected void run(String mdStoreVersion, HttpClientParams clientParams, ApiDescriptor api, + String dnetMessageManagerURL, String workflowId) throws IOException { + final MessageSender ms = new MessageSender(dnetMessageManagerURL, workflowId); final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class); @@ -70,13 +86,9 @@ public class CollectorWorkerApplication { final String reportPath = currentVersion.getHdfsPath() + REPORT_FILE_NAME; log.info("report path is {}", reportPath); - final HttpClientParams clientParams = getClientParams(argumentParser); - - final ApiDescriptor api = MAPPER.readValue(apiDescriptor, ApiDescriptor.class); - final Configuration conf = getHadoopConfiguration(hdfsuri); - - try (CollectorPluginReport report = new CollectorPluginReport(FileSystem.get(conf), new Path(reportPath))) { - final CollectorWorker worker = new CollectorWorker(api, conf, currentVersion, clientParams, ms, report); + try (CollectorPluginReport report = new CollectorPluginReport(fileSystem, new Path(reportPath))) { + final CollectorWorker worker = new CollectorWorker(api, fileSystem, currentVersion, clientParams, ms, + report); worker.collect(); report.setSuccess(true); } catch (Throwable e) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerReporter.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerReporter.java similarity index 97% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerReporter.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerReporter.java index 3f6fc47849..d8cf3ec02e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerReporter.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerReporter.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.collection.worker; +package eu.dnetlib.dhp.collection; import static eu.dnetlib.dhp.common.Constants.REPORT_FILE_NAME; import static eu.dnetlib.dhp.utils.DHPUtils.*; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/HttpClientParams.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/HttpClientParams.java similarity index 97% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/HttpClientParams.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/HttpClientParams.java index f457904604..ab0d5cc02a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/HttpClientParams.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/HttpClientParams.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.collection.worker; +package eu.dnetlib.dhp.collection; /** * Bundles the http connection parameters driving the client behaviour. diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/HttpConnector2.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/HttpConnector2.java similarity index 99% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/HttpConnector2.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/HttpConnector2.java index 368c89509e..72a2a70a2f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/HttpConnector2.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/HttpConnector2.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.collection.worker; +package eu.dnetlib.dhp.collection; import static eu.dnetlib.dhp.utils.DHPUtils.*; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/UnknownCollectorPluginException.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/UnknownCollectorPluginException.java similarity index 94% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/UnknownCollectorPluginException.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/UnknownCollectorPluginException.java index 7134dd0699..2b0a98e530 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/UnknownCollectorPluginException.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/UnknownCollectorPluginException.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.collection.worker; +package eu.dnetlib.dhp.collection; public class UnknownCollectorPluginException extends Exception { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/XmlCleaner.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/XmlCleaner.java similarity index 99% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/XmlCleaner.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/XmlCleaner.java index 41ba021969..c674031f62 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/XmlCleaner.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/XmlCleaner.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.collection.worker; +package eu.dnetlib.dhp.collection; import java.util.HashMap; import java.util.HashSet; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java index e2be481ed3..0a4b3a892f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java @@ -4,8 +4,8 @@ package eu.dnetlib.dhp.collection.plugin; import java.util.stream.Stream; import eu.dnetlib.dhp.collection.ApiDescriptor; -import eu.dnetlib.dhp.collection.worker.CollectorException; -import eu.dnetlib.dhp.collection.worker.CollectorPluginReport; +import eu.dnetlib.dhp.collection.CollectorException; +import eu.dnetlib.dhp.collection.CollectorPluginReport; public interface CollectorPlugin { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbCollectorPlugin.java new file mode 100644 index 0000000000..7d1952f9cf --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbCollectorPlugin.java @@ -0,0 +1,59 @@ + +package eu.dnetlib.dhp.collection.plugin.mongodb; + +import java.util.Optional; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.bson.Document; + +import com.mongodb.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; + +import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.collection.CollectorException; +import eu.dnetlib.dhp.collection.CollectorPluginReport; +import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; + +public class MongoDbCollectorPlugin implements CollectorPlugin { + + public static final String MONGODB_HOST = "mongodb_host"; + public static final String MONGODB_PORT = "mongodb_port"; + public static final String MONGODB_COLLECTION = "mongodb_collection"; + public static final String MONGODB_DBNAME = "mongodb_dbname"; + + @Override + public Stream collect(ApiDescriptor api, CollectorPluginReport report) throws CollectorException { + + final String host = Optional + .ofNullable(api.getParams().get(MONGODB_HOST)) + .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_HOST))); + + final Integer port = Optional + .ofNullable(api.getParams().get(MONGODB_PORT)) + .map(Integer::parseInt) + .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_PORT))); + + final String dbName = Optional + .ofNullable(api.getParams().get(MONGODB_DBNAME)) + .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_DBNAME))); + + final String collection = Optional + .ofNullable(api.getParams().get(MONGODB_COLLECTION)) + .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_COLLECTION))); + + final MongoClient mongoClient = new MongoClient(host, port); + final MongoDatabase database = mongoClient.getDatabase(dbName); + final MongoCollection mdstore = database.getCollection(collection); + + long size = mdstore.count(); + + return StreamSupport + .stream( + Spliterators.spliterator(mdstore.find().iterator(), size, Spliterator.SIZED), false) + .map(doc -> doc.getString("body")); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbDumpCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbDumpCollectorPlugin.java new file mode 100644 index 0000000000..d08732593f --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbDumpCollectorPlugin.java @@ -0,0 +1,54 @@ + +package eu.dnetlib.dhp.collection.plugin.mongodb; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.util.Optional; +import java.util.stream.Stream; +import java.util.zip.GZIPInputStream; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.collection.CollectorException; +import eu.dnetlib.dhp.collection.CollectorPluginReport; +import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; +import eu.dnetlib.dhp.utils.DHPUtils; + +public class MongoDbDumpCollectorPlugin implements CollectorPlugin { + + public static final String PATH_PARAM = "path"; + public static final String BODY_JSONPATH = "$.body"; + + public FileSystem fileSystem; + + public MongoDbDumpCollectorPlugin(FileSystem fileSystem) { + this.fileSystem = fileSystem; + } + + @Override + public Stream collect(ApiDescriptor api, CollectorPluginReport report) throws CollectorException { + + final Path path = Optional + .ofNullable(api.getParams().get("path")) + .map(Path::new) + .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", PATH_PARAM))); + + try { + if (!fileSystem.exists(path)) { + throw new CollectorException("path does not exist: " + path.toString()); + } + + return new BufferedReader( + new InputStreamReader(new GZIPInputStream(fileSystem.open(path)), Charset.defaultCharset())) + .lines() + .map(s -> DHPUtils.getJPathString(BODY_JSONPATH, s)); + + } catch (IOException e) { + throw new CollectorException(e); + } + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java index 84228abf4f..8efdeb838f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java @@ -14,10 +14,10 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.collection.CollectorException; +import eu.dnetlib.dhp.collection.CollectorPluginReport; +import eu.dnetlib.dhp.collection.HttpClientParams; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; -import eu.dnetlib.dhp.collection.worker.CollectorException; -import eu.dnetlib.dhp.collection.worker.CollectorPluginReport; -import eu.dnetlib.dhp.collection.worker.HttpClientParams; public class OaiCollectorPlugin implements CollectorPlugin { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java index 8d913b68fd..edfcb7bb5a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java @@ -16,10 +16,10 @@ import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import eu.dnetlib.dhp.collection.worker.CollectorException; -import eu.dnetlib.dhp.collection.worker.CollectorPluginReport; -import eu.dnetlib.dhp.collection.worker.HttpConnector2; -import eu.dnetlib.dhp.collection.worker.XmlCleaner; +import eu.dnetlib.dhp.collection.CollectorException; +import eu.dnetlib.dhp.collection.CollectorPluginReport; +import eu.dnetlib.dhp.collection.HttpConnector2; +import eu.dnetlib.dhp.collection.XmlCleaner; public class OaiIterator implements Iterator { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java index f63fa37a18..d7b5de087d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java @@ -3,9 +3,9 @@ package eu.dnetlib.dhp.collection.plugin.oai; import java.util.Iterator; -import eu.dnetlib.dhp.collection.worker.CollectorPluginReport; -import eu.dnetlib.dhp.collection.worker.HttpClientParams; -import eu.dnetlib.dhp.collection.worker.HttpConnector2; +import eu.dnetlib.dhp.collection.CollectorPluginReport; +import eu.dnetlib.dhp.collection.HttpClientParams; +import eu.dnetlib.dhp.collection.HttpConnector2; public class OaiIteratorFactory { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorPluginFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorPluginFactory.java deleted file mode 100644 index 9668098f09..0000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorPluginFactory.java +++ /dev/null @@ -1,20 +0,0 @@ - -package eu.dnetlib.dhp.collection.worker; - -import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; -import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin; - -public class CollectorPluginFactory { - - public static CollectorPlugin getPluginByProtocol(final HttpClientParams clientParams, final String protocol) - throws UnknownCollectorPluginException { - if (protocol == null) - throw new UnknownCollectorPluginException("protocol cannot be null"); - switch (protocol.toLowerCase().trim()) { - case "oai": - return new OaiCollectorPlugin(clientParams); - default: - throw new UnknownCollectorPluginException("Unknown protocol"); - } - } -} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml index 5497b2c509..1bab596599 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml @@ -98,7 +98,7 @@ - eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication + eu.dnetlib.dhp.collection.CollectorWorkerApplication ${collection_java_xmx} --apidescriptor${apiDescription} --namenode${nameNode} @@ -118,7 +118,7 @@ - eu.dnetlib.dhp.collection.worker.CollectorWorkerReporter + eu.dnetlib.dhp.collection.CollectorWorkerReporter ${collection_java_xmx} --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} --namenode${nameNode} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java index 7f597f9507..acb4caa226 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java @@ -13,8 +13,8 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import eu.dnetlib.dhp.actionmanager.project.utils.EXCELParser; -import eu.dnetlib.dhp.collection.worker.CollectorException; -import eu.dnetlib.dhp.collection.worker.HttpConnector2; +import eu.dnetlib.dhp.collection.CollectorException; +import eu.dnetlib.dhp.collection.HttpConnector2; @Disabled public class EXCELParserTest { diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java deleted file mode 100644 index 6f7bb2bc2d..0000000000 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java +++ /dev/null @@ -1,130 +0,0 @@ - -package eu.dnetlib.dhp.collection; - -import static org.junit.jupiter.api.Assertions.*; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.*; -import org.junit.jupiter.api.io.TempDir; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.data.mdstore.manager.common.model.MDStoreCurrentVersion; -import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; -import eu.dnetlib.dhp.model.mdstore.MetadataRecord; -import eu.dnetlib.dhp.model.mdstore.Provenance; -import eu.dnetlib.dhp.schema.common.ModelSupport; - -public class CollectionJobTest { - - private static SparkSession spark; - - @BeforeAll - public static void beforeAll() { - SparkConf conf = new SparkConf(); - conf.setAppName(CollectionJobTest.class.getSimpleName()); - conf.setMaster("local"); - spark = SparkSession.builder().config(conf).getOrCreate(); - } - - @AfterAll - public static void afterAll() { - spark.stop(); - } - - @Test - public void testJSONSerialization() throws Exception { - final String s = IOUtils.toString(getClass().getResourceAsStream("input.json")); - System.out.println("s = " + s); - final ObjectMapper mapper = new ObjectMapper(); - MDStoreVersion mi = mapper.readValue(s, MDStoreVersion.class); - - assertNotNull(mi); - - } - - @Test - public void tesCollection(@TempDir Path testDir) throws Exception { - final Provenance provenance = new Provenance("pippo", "puppa", "ns_prefix"); - Assertions.assertNotNull(new ObjectMapper().writeValueAsString(provenance)); - - GenerateNativeStoreSparkJob - .main( - new String[] { - "issm", "true", - "-w", "wid", - "-e", "XML", - "-d", "" + System.currentTimeMillis(), - "-p", new ObjectMapper().writeValueAsString(provenance), - "-x", "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", - "-i", this.getClass().getResource("/eu/dnetlib/dhp/collection/native.seq").toString(), - "-o", testDir.toString() + "/store", - "-t", "true", - "-ru", "", - "-rp", "", - "-rh", "", - "-ro", "", - "-rr", "" - }); - - // TODO introduce useful assertions - - } - - @Test - public void testGenerationMetadataRecord() throws Exception { - - final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml")); - - final MetadataRecord record = GenerateNativeStoreSparkJob - .parseRecord( - xml, - "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", - "XML", - new Provenance("foo", "bar", "ns_prefix"), - System.currentTimeMillis(), - null, - null); - - assertNotNull(record.getId()); - assertNotNull(record.getOriginalId()); - } - - @Test - public void TestEquals() throws IOException { - - final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml")); - final MetadataRecord record = GenerateNativeStoreSparkJob - .parseRecord( - xml, - "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", - "XML", - new Provenance("foo", "bar", "ns_prefix"), - System.currentTimeMillis(), - null, - null); - final MetadataRecord record1 = GenerateNativeStoreSparkJob - .parseRecord( - xml, - "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", - "XML", - new Provenance("foo", "bar", "ns_prefix"), - System.currentTimeMillis(), - null, - null); - - record.setBody("ciao"); - record1.setBody("mondo"); - - assertNotNull(record); - assertNotNull(record1); - assertEquals(record, record1); - } -} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionWorkflowTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionWorkflowTest.java new file mode 100644 index 0000000000..cd6275d7fd --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionWorkflowTest.java @@ -0,0 +1,113 @@ + +package eu.dnetlib.dhp.collection; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@ExtendWith(MockitoExtension.class) +public class CollectionWorkflowTest { + + private static final Logger log = LoggerFactory.getLogger(CollectionWorkflowTest.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static Path workingDir; + + private static DistributedFileSystem fileSystem; + + // private static MiniDFSCluster hdfsCluster; + + private static ApiDescriptor api; + private static String mdStoreVersion; + + private static final String encoding = "XML"; + private static final String dateOfCollection = System.currentTimeMillis() + ""; + private static final String xpath = "//*[local-name()='header']/*[local-name()='identifier']"; + private static String provenance; + + private static final String msgMgrUrl = "http://localhost:%s/mock/mvc/dhp/message"; + + @BeforeAll + protected static void beforeAll() throws Exception { + provenance = IOUtils + .toString(CollectionWorkflowTest.class.getResourceAsStream("/eu/dnetlib/dhp/collection/provenance.json")); + + workingDir = Files.createTempDirectory(CollectionWorkflowTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + /* + * Configuration conf = new Configuration(); conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, + * workingDir.toString()); hdfsCluster = new MiniDFSCluster.Builder(conf).build(); fileSystem = + * hdfsCluster.getFileSystem(); api = OBJECT_MAPPER .readValue( + * IOUtils.toString(CollectionWorkflowTest.class.getResourceAsStream("apiDescriptor.json")), + * ApiDescriptor.class); mdStoreVersion = OBJECT_MAPPER + * .writeValueAsString(prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json")); + */ + } + + @AfterAll + protected static void tearDown() { + /* + * hdfsCluster.shutdown(); FileUtil.fullyDelete(workingDir.toFile()); + */ + + } + + /** + + + eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication + ${collection_java_xmx} + --apidescriptor${apiDescription} + --namenode${nameNode} + --workflowId${workflowId} + --dnetMessageManagerURL${dnetMessageManagerURL} + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --maxNumberOfRetry${maxNumberOfRetry} + --requestDelay${requestDelay} + --retryDelay${retryDelay} + --connectTimeOut${connectTimeOut} + --readTimeOut${readTimeOut} + + + + + + */ + // @Test + // @Order(1) + public void testCollectorWorkerApplication() throws Exception { + + final HttpClientParams httpClientParams = new HttpClientParams(); + + // String url = String.format(msgMgrUrl, wireMockServer.port()); + + // new CollectorWorkerApplication(fileSystem).run(mdStoreVersion, httpClientParams, api, url, "1234"); + + } + + public static MDStoreVersion prepareVersion(String filename) throws IOException { + MDStoreVersion mdstore = OBJECT_MAPPER + .readValue(IOUtils.toString(CollectionWorkflowTest.class.getResource(filename)), MDStoreVersion.class); + mdstore.setHdfsPath(String.format(mdstore.getHdfsPath(), workingDir.toString())); + return mdstore; + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest.java similarity index 73% rename from dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java rename to dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest.java index ff3ff3b6e0..723f030a6a 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest.java @@ -1,8 +1,9 @@ -package eu.dnetlib.dhp.aggregation; +package eu.dnetlib.dhp.collection; import static eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import java.io.File; import java.io.FileOutputStream; @@ -36,14 +37,14 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; -import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob; +import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; +import eu.dnetlib.dhp.model.mdstore.Provenance; import eu.dnetlib.dhp.transformation.TransformSparkJobNode; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @ExtendWith(MockitoExtension.class) -public class AggregationJobTest extends AbstractVocabularyTest { +public class GenerateNativeStoreSparkJobTest extends AbstractVocabularyTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -58,18 +59,20 @@ public class AggregationJobTest extends AbstractVocabularyTest { private static final String xpath = "//*[local-name()='header']/*[local-name()='identifier']"; private static String provenance; - private static final Logger log = LoggerFactory.getLogger(AggregationJobTest.class); + private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJobTest.class); @BeforeAll public static void beforeAll() throws IOException { provenance = IOUtils - .toString(AggregationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/collection/provenance.json")); - workingDir = Files.createTempDirectory(AggregationJobTest.class.getSimpleName()); + .toString( + GenerateNativeStoreSparkJobTest.class + .getResourceAsStream("/eu/dnetlib/dhp/collection/provenance.json")); + workingDir = Files.createTempDirectory(GenerateNativeStoreSparkJobTest.class.getSimpleName()); log.info("using work dir {}", workingDir); SparkConf conf = new SparkConf(); - conf.setAppName(AggregationJobTest.class.getSimpleName()); + conf.setAppName(GenerateNativeStoreSparkJobTest.class.getSimpleName()); conf.setMaster("local[*]"); conf.set("spark.driver.host", "localhost"); @@ -81,7 +84,7 @@ public class AggregationJobTest extends AbstractVocabularyTest { encoder = Encoders.bean(MetadataRecord.class); spark = SparkSession .builder() - .appName(AggregationJobTest.class.getSimpleName()) + .appName(GenerateNativeStoreSparkJobTest.class.getSimpleName()) .config(conf) .getOrCreate(); } @@ -202,6 +205,67 @@ public class AggregationJobTest extends AbstractVocabularyTest { } + @Test + public void testJSONSerialization() throws Exception { + final String s = IOUtils.toString(getClass().getResourceAsStream("mdStoreVersion_1.json")); + System.out.println("s = " + s); + final ObjectMapper mapper = new ObjectMapper(); + MDStoreVersion mi = mapper.readValue(s, MDStoreVersion.class); + + assertNotNull(mi); + + } + + @Test + public void testGenerationMetadataRecord() throws Exception { + + final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml")); + + final MetadataRecord record = GenerateNativeStoreSparkJob + .parseRecord( + xml, + "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", + "XML", + new Provenance("foo", "bar", "ns_prefix"), + System.currentTimeMillis(), + null, + null); + + assertNotNull(record.getId()); + assertNotNull(record.getOriginalId()); + } + + @Test + public void testEquals() throws IOException { + + final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml")); + final MetadataRecord record = GenerateNativeStoreSparkJob + .parseRecord( + xml, + "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", + "XML", + new Provenance("foo", "bar", "ns_prefix"), + System.currentTimeMillis(), + null, + null); + final MetadataRecord record1 = GenerateNativeStoreSparkJob + .parseRecord( + xml, + "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", + "XML", + new Provenance("foo", "bar", "ns_prefix"), + System.currentTimeMillis(), + null, + null); + + record.setBody("ciao"); + record1.setBody("mondo"); + + assertNotNull(record); + assertNotNull(record1); + assertEquals(record, record1); + } + protected void verify(MDStoreVersion mdStoreVersion) throws IOException { Assertions.assertTrue(new File(mdStoreVersion.getHdfsPath()).exists()); @@ -226,7 +290,7 @@ public class AggregationJobTest extends AbstractVocabularyTest { Assertions.assertEquals(seqFileSize, uniqueIds, "the size must be equal"); } - private MDStoreVersion prepareVersion(String filename) throws IOException { + public MDStoreVersion prepareVersion(String filename) throws IOException { MDStoreVersion mdstore = OBJECT_MAPPER .readValue(IOUtils.toString(getClass().getResource(filename)), MDStoreVersion.class); mdstore.setHdfsPath(String.format(mdstore.getHdfsPath(), workingDir.toString())); diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/CollectorWorkerApplicationTests.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/CollectorWorkerApplicationTests.java index 975ef944e0..b5ea5f0696 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/CollectorWorkerApplicationTests.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/CollectorWorkerApplicationTests.java @@ -9,20 +9,10 @@ import org.junit.jupiter.api.Test; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.collection.ApiDescriptor; -import eu.dnetlib.dhp.collection.worker.CollectorPluginFactory; -import eu.dnetlib.dhp.collection.worker.HttpClientParams; @Disabled public class CollectorWorkerApplicationTests { - @Test - public void testFindPlugin() throws Exception { - final CollectorPluginFactory collectorPluginEnumerator = new CollectorPluginFactory(); - final HttpClientParams clientParams = new HttpClientParams(); - assertNotNull(collectorPluginEnumerator.getPluginByProtocol(clientParams, "oai")); - assertNotNull(collectorPluginEnumerator.getPluginByProtocol(clientParams, "OAI")); - } - @Test public void testCollectionOAI() throws Exception { final ApiDescriptor api = new ApiDescriptor(); diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/utils/CollectorPluginReportTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/utils/CollectorPluginReportTest.java index d665e5b5fa..fd90a1b843 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/utils/CollectorPluginReportTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/utils/CollectorPluginReportTest.java @@ -8,7 +8,7 @@ import java.io.IOException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import eu.dnetlib.dhp.collection.worker.CollectorPluginReport; +import eu.dnetlib.dhp.collection.CollectorPluginReport; public class CollectorPluginReportTest { diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java index 997727e336..356fb252d2 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java @@ -27,7 +27,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest; import eu.dnetlib.dhp.aggregation.common.AggregationCounter; -import eu.dnetlib.dhp.collection.CollectionJobTest; +import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJobTest; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; @@ -40,7 +40,7 @@ public class TransformationJobTest extends AbstractVocabularyTest { @BeforeAll public static void beforeAll() throws IOException, ISLookUpException { SparkConf conf = new SparkConf(); - conf.setAppName(CollectionJobTest.class.getSimpleName()); + conf.setAppName(GenerateNativeStoreSparkJobTest.class.getSimpleName()); conf.setMaster("local"); spark = SparkSession.builder().config(conf).getOrCreate(); } diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/apiDescriptor.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/apiDescriptor.json new file mode 100644 index 0000000000..99957cac9c --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/apiDescriptor.json @@ -0,0 +1,10 @@ +{ + "id":"api_________::opendoar____::2::0", + "baseUrl":"https://www.alexandria.unisg.ch/cgi/oai2", + "protocol":"oai", + "params": { + "set":"driver", + "metadata_identifier_path":"//*[local-name()\u003d\u0027header\u0027]/*[local-name()\u003d\u0027identifier\u0027]", + "format":"oai_dc" + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/fields.xml b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/fields.xml index 1f5cf7b815..0352092b22 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/fields.xml +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/fields.xml @@ -79,6 +79,8 @@ + + diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/record.xml b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/record.xml index b617dbea25..a0ca0aa6fe 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/record.xml +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/record.xml @@ -39,6 +39,8 @@ Saykally, Jessica N. Keeley, Kristen L. Haris Hatic + Baglioni, Miriam + De Bonis, Michele 2017-06-01 Withania somnifera has been used in traditional medicine for a variety of neural disorders. Recently, chronic neurodegenerative conditions have been @@ -115,7 +117,7 @@ Cell Transplantation - + Cell Transplantation