diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java b/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java
index 3f9d07a7eb..16bb0c97ea 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java
@@ -1,6 +1,9 @@
package eu.dnetlib.dhp.message;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
@@ -30,13 +33,15 @@ public class MessageSender {
private final String workflowId;
+ private ExecutorService executorService = Executors.newCachedThreadPool();
+
public MessageSender(final String dnetMessageEndpoint, final String workflowId) {
this.workflowId = workflowId;
this.dnetMessageEndpoint = dnetMessageEndpoint;
}
public void sendMessage(final Message message) {
- new Thread(() -> _sendMessage(message)).start();
+ executorService.submit(() -> _sendMessage(message));
}
public void sendMessage(final Long current, final Long total) {
@@ -67,7 +72,6 @@ public class MessageSender {
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS)
.setSocketTimeout(SOCKET_TIMEOUT_MS)
.build();
- ;
try (final CloseableHttpClient client = HttpClients
.custom()
diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml
index f0ee425429..6887be55e4 100644
--- a/dhp-workflows/dhp-aggregation/pom.xml
+++ b/dhp-workflows/dhp-aggregation/pom.xml
@@ -106,7 +106,10 @@
commons-compress
-
+
+ org.mongodb
+ mongo-java-driver
+
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java
index 3f64eb9537..cad6b94e13 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java
@@ -18,7 +18,7 @@ import org.apache.hadoop.fs.Path;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.collection.worker.HttpConnector2;
+import eu.dnetlib.dhp.collection.HttpConnector2;
/**
* Applies the parsing of a csv file and writes the Serialization of it in hdfs
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java
index c661909b02..fc3b38ac58 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java
@@ -15,7 +15,7 @@ import org.apache.hadoop.fs.Path;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.collection.worker.HttpConnector2;
+import eu.dnetlib.dhp.collection.HttpConnector2;
/**
* Applies the parsing of an excel file and writes the Serialization of it in hdfs
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorException.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorException.java
similarity index 93%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorException.java
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorException.java
index 71d225f131..144d297e62 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorException.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorException.java
@@ -1,5 +1,5 @@
-package eu.dnetlib.dhp.collection.worker;
+package eu.dnetlib.dhp.collection;
public class CollectorException extends Exception {
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorPluginReport.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorPluginReport.java
similarity index 90%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorPluginReport.java
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorPluginReport.java
index 2da6ac8f99..a7204523ab 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorPluginReport.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorPluginReport.java
@@ -1,5 +1,5 @@
-package eu.dnetlib.dhp.collection.worker;
+package eu.dnetlib.dhp.collection;
import static eu.dnetlib.dhp.utils.DHPUtils.*;
@@ -17,15 +17,10 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonIgnore;
-import eu.dnetlib.dhp.application.ApplicationUtils;
-
public class CollectorPluginReport extends LinkedHashMap implements Closeable {
private static final Logger log = LoggerFactory.getLogger(CollectorPluginReport.class);
- @JsonIgnore
- private FileSystem fs;
-
@JsonIgnore
private Path path;
@@ -38,9 +33,7 @@ public class CollectorPluginReport extends LinkedHashMap impleme
}
public CollectorPluginReport(FileSystem fs, Path path) throws IOException {
- this.fs = fs;
this.path = path;
-
this.fos = fs.create(path);
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java
similarity index 64%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java
index 945eff8b07..ace725bfd7 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java
@@ -1,23 +1,27 @@
-package eu.dnetlib.dhp.collection.worker;
+package eu.dnetlib.dhp.collection;
import static eu.dnetlib.dhp.common.Constants.SEQUENCE_FILE_NAME;
import java.io.IOException;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.DeflateCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
-import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
+import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbCollectorPlugin;
+import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin;
+import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin;
import eu.dnetlib.dhp.message.MessageSender;
public class CollectorWorker {
@@ -26,7 +30,7 @@ public class CollectorWorker {
private final ApiDescriptor api;
- private final Configuration conf;
+ private final FileSystem fileSystem;
private final MDStoreVersion mdStoreVersion;
@@ -38,13 +42,13 @@ public class CollectorWorker {
public CollectorWorker(
final ApiDescriptor api,
- final Configuration conf,
+ final FileSystem fileSystem,
final MDStoreVersion mdStoreVersion,
final HttpClientParams clientParams,
final MessageSender messageSender,
final CollectorPluginReport report) {
this.api = api;
- this.conf = conf;
+ this.fileSystem = fileSystem;
this.mdStoreVersion = mdStoreVersion;
this.clientParams = clientParams;
this.messageSender = messageSender;
@@ -56,16 +60,16 @@ public class CollectorWorker {
final String outputPath = mdStoreVersion.getHdfsPath() + SEQUENCE_FILE_NAME;
log.info("outputPath path is {}", outputPath);
- final CollectorPlugin plugin = CollectorPluginFactory.getPluginByProtocol(clientParams, api.getProtocol());
+ final CollectorPlugin plugin = getCollectorPlugin();
final AtomicInteger counter = new AtomicInteger(0);
try (SequenceFile.Writer writer = SequenceFile
.createWriter(
- conf,
+ fileSystem.getConf(),
SequenceFile.Writer.file(new Path(outputPath)),
SequenceFile.Writer.keyClass(IntWritable.class),
SequenceFile.Writer.valueClass(Text.class),
- SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new GzipCodec()))) {
+ SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
final IntWritable key = new IntWritable(counter.get());
final Text value = new Text();
plugin
@@ -94,4 +98,26 @@ public class CollectorWorker {
}
}
+ private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException {
+ switch (StringUtils.lowerCase(StringUtils.trim(api.getProtocol()))) {
+ case "oai":
+ return new OaiCollectorPlugin(clientParams);
+ case "other":
+ final String plugin = Optional
+ .ofNullable(api.getParams().get("other_plugin_type"))
+ .orElseThrow(() -> new UnknownCollectorPluginException("other_plugin_type"));
+
+ switch (plugin) {
+ case "mdstore_mongodb_dump":
+ return new MongoDbDumpCollectorPlugin(fileSystem);
+ case "mdstore_mongodb":
+ return new MongoDbCollectorPlugin();
+ default:
+ throw new UnknownCollectorPluginException("Unknown plugin type: " + plugin);
+ }
+ default:
+ throw new UnknownCollectorPluginException("Unknown protocol: " + api.getProtocol());
+ }
+ }
+
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerApplication.java
similarity index 86%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerApplication.java
index 17f09ee5a1..0eea0837c0 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerApplication.java
@@ -1,5 +1,5 @@
-package eu.dnetlib.dhp.collection.worker;
+package eu.dnetlib.dhp.collection;
import static eu.dnetlib.dhp.common.Constants.*;
import static eu.dnetlib.dhp.utils.DHPUtils.*;
@@ -9,7 +9,6 @@ import java.util.Optional;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -17,7 +16,6 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.message.MessageSender;
/**
@@ -32,6 +30,12 @@ public class CollectorWorkerApplication {
private static final Logger log = LoggerFactory.getLogger(CollectorWorkerApplication.class);
+ private FileSystem fileSystem;
+
+ public CollectorWorkerApplication(FileSystem fileSystem) {
+ this.fileSystem = fileSystem;
+ }
+
/**
* @param args
*/
@@ -63,6 +67,18 @@ public class CollectorWorkerApplication {
final String workflowId = argumentParser.get("workflowId");
log.info("workflowId is {}", workflowId);
+ final HttpClientParams clientParams = getClientParams(argumentParser);
+
+ final ApiDescriptor api = MAPPER.readValue(apiDescriptor, ApiDescriptor.class);
+ final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(hdfsuri));
+
+ new CollectorWorkerApplication(fileSystem)
+ .run(mdStoreVersion, clientParams, api, dnetMessageManagerURL, workflowId);
+ }
+
+ protected void run(String mdStoreVersion, HttpClientParams clientParams, ApiDescriptor api,
+ String dnetMessageManagerURL, String workflowId) throws IOException {
+
final MessageSender ms = new MessageSender(dnetMessageManagerURL, workflowId);
final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class);
@@ -70,13 +86,9 @@ public class CollectorWorkerApplication {
final String reportPath = currentVersion.getHdfsPath() + REPORT_FILE_NAME;
log.info("report path is {}", reportPath);
- final HttpClientParams clientParams = getClientParams(argumentParser);
-
- final ApiDescriptor api = MAPPER.readValue(apiDescriptor, ApiDescriptor.class);
- final Configuration conf = getHadoopConfiguration(hdfsuri);
-
- try (CollectorPluginReport report = new CollectorPluginReport(FileSystem.get(conf), new Path(reportPath))) {
- final CollectorWorker worker = new CollectorWorker(api, conf, currentVersion, clientParams, ms, report);
+ try (CollectorPluginReport report = new CollectorPluginReport(fileSystem, new Path(reportPath))) {
+ final CollectorWorker worker = new CollectorWorker(api, fileSystem, currentVersion, clientParams, ms,
+ report);
worker.collect();
report.setSuccess(true);
} catch (Throwable e) {
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerReporter.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerReporter.java
similarity index 97%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerReporter.java
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerReporter.java
index 3f6fc47849..d8cf3ec02e 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerReporter.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerReporter.java
@@ -1,5 +1,5 @@
-package eu.dnetlib.dhp.collection.worker;
+package eu.dnetlib.dhp.collection;
import static eu.dnetlib.dhp.common.Constants.REPORT_FILE_NAME;
import static eu.dnetlib.dhp.utils.DHPUtils.*;
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/HttpClientParams.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/HttpClientParams.java
similarity index 97%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/HttpClientParams.java
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/HttpClientParams.java
index f457904604..ab0d5cc02a 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/HttpClientParams.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/HttpClientParams.java
@@ -1,5 +1,5 @@
-package eu.dnetlib.dhp.collection.worker;
+package eu.dnetlib.dhp.collection;
/**
* Bundles the http connection parameters driving the client behaviour.
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/HttpConnector2.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/HttpConnector2.java
similarity index 99%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/HttpConnector2.java
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/HttpConnector2.java
index 368c89509e..72a2a70a2f 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/HttpConnector2.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/HttpConnector2.java
@@ -1,5 +1,5 @@
-package eu.dnetlib.dhp.collection.worker;
+package eu.dnetlib.dhp.collection;
import static eu.dnetlib.dhp.utils.DHPUtils.*;
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/UnknownCollectorPluginException.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/UnknownCollectorPluginException.java
similarity index 94%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/UnknownCollectorPluginException.java
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/UnknownCollectorPluginException.java
index 7134dd0699..2b0a98e530 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/UnknownCollectorPluginException.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/UnknownCollectorPluginException.java
@@ -1,5 +1,5 @@
-package eu.dnetlib.dhp.collection.worker;
+package eu.dnetlib.dhp.collection;
public class UnknownCollectorPluginException extends Exception {
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/XmlCleaner.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/XmlCleaner.java
similarity index 99%
rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/XmlCleaner.java
rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/XmlCleaner.java
index 41ba021969..c674031f62 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/XmlCleaner.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/XmlCleaner.java
@@ -1,5 +1,5 @@
-package eu.dnetlib.dhp.collection.worker;
+package eu.dnetlib.dhp.collection;
import java.util.HashMap;
import java.util.HashSet;
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java
index e2be481ed3..0a4b3a892f 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java
@@ -4,8 +4,8 @@ package eu.dnetlib.dhp.collection.plugin;
import java.util.stream.Stream;
import eu.dnetlib.dhp.collection.ApiDescriptor;
-import eu.dnetlib.dhp.collection.worker.CollectorException;
-import eu.dnetlib.dhp.collection.worker.CollectorPluginReport;
+import eu.dnetlib.dhp.collection.CollectorException;
+import eu.dnetlib.dhp.collection.CollectorPluginReport;
public interface CollectorPlugin {
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbCollectorPlugin.java
new file mode 100644
index 0000000000..7d1952f9cf
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbCollectorPlugin.java
@@ -0,0 +1,59 @@
+
+package eu.dnetlib.dhp.collection.plugin.mongodb;
+
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import org.bson.Document;
+
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
+import eu.dnetlib.dhp.collection.ApiDescriptor;
+import eu.dnetlib.dhp.collection.CollectorException;
+import eu.dnetlib.dhp.collection.CollectorPluginReport;
+import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
+
+public class MongoDbCollectorPlugin implements CollectorPlugin {
+
+ public static final String MONGODB_HOST = "mongodb_host";
+ public static final String MONGODB_PORT = "mongodb_port";
+ public static final String MONGODB_COLLECTION = "mongodb_collection";
+ public static final String MONGODB_DBNAME = "mongodb_dbname";
+
+ @Override
+ public Stream collect(ApiDescriptor api, CollectorPluginReport report) throws CollectorException {
+
+ final String host = Optional
+ .ofNullable(api.getParams().get(MONGODB_HOST))
+ .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_HOST)));
+
+ final Integer port = Optional
+ .ofNullable(api.getParams().get(MONGODB_PORT))
+ .map(Integer::parseInt)
+ .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_PORT)));
+
+ final String dbName = Optional
+ .ofNullable(api.getParams().get(MONGODB_DBNAME))
+ .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_DBNAME)));
+
+ final String collection = Optional
+ .ofNullable(api.getParams().get(MONGODB_COLLECTION))
+ .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", MONGODB_COLLECTION)));
+
+ final MongoClient mongoClient = new MongoClient(host, port);
+ final MongoDatabase database = mongoClient.getDatabase(dbName);
+ final MongoCollection mdstore = database.getCollection(collection);
+
+ long size = mdstore.count();
+
+ return StreamSupport
+ .stream(
+ Spliterators.spliterator(mdstore.find().iterator(), size, Spliterator.SIZED), false)
+ .map(doc -> doc.getString("body"));
+ }
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbDumpCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbDumpCollectorPlugin.java
new file mode 100644
index 0000000000..d08732593f
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbDumpCollectorPlugin.java
@@ -0,0 +1,54 @@
+
+package eu.dnetlib.dhp.collection.plugin.mongodb;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.Optional;
+import java.util.stream.Stream;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import eu.dnetlib.dhp.collection.ApiDescriptor;
+import eu.dnetlib.dhp.collection.CollectorException;
+import eu.dnetlib.dhp.collection.CollectorPluginReport;
+import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
+import eu.dnetlib.dhp.utils.DHPUtils;
+
+public class MongoDbDumpCollectorPlugin implements CollectorPlugin {
+
+ public static final String PATH_PARAM = "path";
+ public static final String BODY_JSONPATH = "$.body";
+
+ public FileSystem fileSystem;
+
+ public MongoDbDumpCollectorPlugin(FileSystem fileSystem) {
+ this.fileSystem = fileSystem;
+ }
+
+ @Override
+ public Stream collect(ApiDescriptor api, CollectorPluginReport report) throws CollectorException {
+
+ final Path path = Optional
+ .ofNullable(api.getParams().get("path"))
+ .map(Path::new)
+ .orElseThrow(() -> new CollectorException(String.format("missing parameter '%s'", PATH_PARAM)));
+
+ try {
+ if (!fileSystem.exists(path)) {
+ throw new CollectorException("path does not exist: " + path.toString());
+ }
+
+ return new BufferedReader(
+ new InputStreamReader(new GZIPInputStream(fileSystem.open(path)), Charset.defaultCharset()))
+ .lines()
+ .map(s -> DHPUtils.getJPathString(BODY_JSONPATH, s));
+
+ } catch (IOException e) {
+ throw new CollectorException(e);
+ }
+ }
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java
index 84228abf4f..8efdeb838f 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java
@@ -14,10 +14,10 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.collection.ApiDescriptor;
+import eu.dnetlib.dhp.collection.CollectorException;
+import eu.dnetlib.dhp.collection.CollectorPluginReport;
+import eu.dnetlib.dhp.collection.HttpClientParams;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
-import eu.dnetlib.dhp.collection.worker.CollectorException;
-import eu.dnetlib.dhp.collection.worker.CollectorPluginReport;
-import eu.dnetlib.dhp.collection.worker.HttpClientParams;
public class OaiCollectorPlugin implements CollectorPlugin {
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java
index 8d913b68fd..edfcb7bb5a 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java
@@ -16,10 +16,10 @@ import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import eu.dnetlib.dhp.collection.worker.CollectorException;
-import eu.dnetlib.dhp.collection.worker.CollectorPluginReport;
-import eu.dnetlib.dhp.collection.worker.HttpConnector2;
-import eu.dnetlib.dhp.collection.worker.XmlCleaner;
+import eu.dnetlib.dhp.collection.CollectorException;
+import eu.dnetlib.dhp.collection.CollectorPluginReport;
+import eu.dnetlib.dhp.collection.HttpConnector2;
+import eu.dnetlib.dhp.collection.XmlCleaner;
public class OaiIterator implements Iterator {
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java
index f63fa37a18..d7b5de087d 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java
@@ -3,9 +3,9 @@ package eu.dnetlib.dhp.collection.plugin.oai;
import java.util.Iterator;
-import eu.dnetlib.dhp.collection.worker.CollectorPluginReport;
-import eu.dnetlib.dhp.collection.worker.HttpClientParams;
-import eu.dnetlib.dhp.collection.worker.HttpConnector2;
+import eu.dnetlib.dhp.collection.CollectorPluginReport;
+import eu.dnetlib.dhp.collection.HttpClientParams;
+import eu.dnetlib.dhp.collection.HttpConnector2;
public class OaiIteratorFactory {
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorPluginFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorPluginFactory.java
deleted file mode 100644
index 9668098f09..0000000000
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorPluginFactory.java
+++ /dev/null
@@ -1,20 +0,0 @@
-
-package eu.dnetlib.dhp.collection.worker;
-
-import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
-import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin;
-
-public class CollectorPluginFactory {
-
- public static CollectorPlugin getPluginByProtocol(final HttpClientParams clientParams, final String protocol)
- throws UnknownCollectorPluginException {
- if (protocol == null)
- throw new UnknownCollectorPluginException("protocol cannot be null");
- switch (protocol.toLowerCase().trim()) {
- case "oai":
- return new OaiCollectorPlugin(clientParams);
- default:
- throw new UnknownCollectorPluginException("Unknown protocol");
- }
- }
-}
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml
index 5497b2c509..1bab596599 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml
@@ -98,7 +98,7 @@
- eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication
+ eu.dnetlib.dhp.collection.CollectorWorkerApplication
${collection_java_xmx}
--apidescriptor${apiDescription}
--namenode${nameNode}
@@ -118,7 +118,7 @@
- eu.dnetlib.dhp.collection.worker.CollectorWorkerReporter
+ eu.dnetlib.dhp.collection.CollectorWorkerReporter
${collection_java_xmx}
--mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
--namenode${nameNode}
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java
index 7f597f9507..acb4caa226 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java
@@ -13,8 +13,8 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.actionmanager.project.utils.EXCELParser;
-import eu.dnetlib.dhp.collection.worker.CollectorException;
-import eu.dnetlib.dhp.collection.worker.HttpConnector2;
+import eu.dnetlib.dhp.collection.CollectorException;
+import eu.dnetlib.dhp.collection.HttpConnector2;
@Disabled
public class EXCELParserTest {
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java
deleted file mode 100644
index 6f7bb2bc2d..0000000000
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-
-package eu.dnetlib.dhp.collection;
-
-import static org.junit.jupiter.api.Assertions.*;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.spark.SparkConf;
-import org.apache.spark.sql.SparkSession;
-import org.junit.jupiter.api.*;
-import org.junit.jupiter.api.io.TempDir;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import eu.dnetlib.data.mdstore.manager.common.model.MDStoreCurrentVersion;
-import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
-import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
-import eu.dnetlib.dhp.model.mdstore.Provenance;
-import eu.dnetlib.dhp.schema.common.ModelSupport;
-
-public class CollectionJobTest {
-
- private static SparkSession spark;
-
- @BeforeAll
- public static void beforeAll() {
- SparkConf conf = new SparkConf();
- conf.setAppName(CollectionJobTest.class.getSimpleName());
- conf.setMaster("local");
- spark = SparkSession.builder().config(conf).getOrCreate();
- }
-
- @AfterAll
- public static void afterAll() {
- spark.stop();
- }
-
- @Test
- public void testJSONSerialization() throws Exception {
- final String s = IOUtils.toString(getClass().getResourceAsStream("input.json"));
- System.out.println("s = " + s);
- final ObjectMapper mapper = new ObjectMapper();
- MDStoreVersion mi = mapper.readValue(s, MDStoreVersion.class);
-
- assertNotNull(mi);
-
- }
-
- @Test
- public void tesCollection(@TempDir Path testDir) throws Exception {
- final Provenance provenance = new Provenance("pippo", "puppa", "ns_prefix");
- Assertions.assertNotNull(new ObjectMapper().writeValueAsString(provenance));
-
- GenerateNativeStoreSparkJob
- .main(
- new String[] {
- "issm", "true",
- "-w", "wid",
- "-e", "XML",
- "-d", "" + System.currentTimeMillis(),
- "-p", new ObjectMapper().writeValueAsString(provenance),
- "-x", "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
- "-i", this.getClass().getResource("/eu/dnetlib/dhp/collection/native.seq").toString(),
- "-o", testDir.toString() + "/store",
- "-t", "true",
- "-ru", "",
- "-rp", "",
- "-rh", "",
- "-ro", "",
- "-rr", ""
- });
-
- // TODO introduce useful assertions
-
- }
-
- @Test
- public void testGenerationMetadataRecord() throws Exception {
-
- final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml"));
-
- final MetadataRecord record = GenerateNativeStoreSparkJob
- .parseRecord(
- xml,
- "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
- "XML",
- new Provenance("foo", "bar", "ns_prefix"),
- System.currentTimeMillis(),
- null,
- null);
-
- assertNotNull(record.getId());
- assertNotNull(record.getOriginalId());
- }
-
- @Test
- public void TestEquals() throws IOException {
-
- final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml"));
- final MetadataRecord record = GenerateNativeStoreSparkJob
- .parseRecord(
- xml,
- "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
- "XML",
- new Provenance("foo", "bar", "ns_prefix"),
- System.currentTimeMillis(),
- null,
- null);
- final MetadataRecord record1 = GenerateNativeStoreSparkJob
- .parseRecord(
- xml,
- "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
- "XML",
- new Provenance("foo", "bar", "ns_prefix"),
- System.currentTimeMillis(),
- null,
- null);
-
- record.setBody("ciao");
- record1.setBody("mondo");
-
- assertNotNull(record);
- assertNotNull(record1);
- assertEquals(record, record1);
- }
-}
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionWorkflowTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionWorkflowTest.java
new file mode 100644
index 0000000000..cd6275d7fd
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionWorkflowTest.java
@@ -0,0 +1,113 @@
+
+package eu.dnetlib.dhp.collection;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
+
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@ExtendWith(MockitoExtension.class)
+public class CollectionWorkflowTest {
+
+ private static final Logger log = LoggerFactory.getLogger(CollectionWorkflowTest.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private static Path workingDir;
+
+ private static DistributedFileSystem fileSystem;
+
+ // private static MiniDFSCluster hdfsCluster;
+
+ private static ApiDescriptor api;
+ private static String mdStoreVersion;
+
+ private static final String encoding = "XML";
+ private static final String dateOfCollection = System.currentTimeMillis() + "";
+ private static final String xpath = "//*[local-name()='header']/*[local-name()='identifier']";
+ private static String provenance;
+
+ private static final String msgMgrUrl = "http://localhost:%s/mock/mvc/dhp/message";
+
+ @BeforeAll
+ protected static void beforeAll() throws Exception {
+ provenance = IOUtils
+ .toString(CollectionWorkflowTest.class.getResourceAsStream("/eu/dnetlib/dhp/collection/provenance.json"));
+
+ workingDir = Files.createTempDirectory(CollectionWorkflowTest.class.getSimpleName());
+ log.info("using work dir {}", workingDir);
+
+ /*
+ * Configuration conf = new Configuration(); conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
+ * workingDir.toString()); hdfsCluster = new MiniDFSCluster.Builder(conf).build(); fileSystem =
+ * hdfsCluster.getFileSystem(); api = OBJECT_MAPPER .readValue(
+ * IOUtils.toString(CollectionWorkflowTest.class.getResourceAsStream("apiDescriptor.json")),
+ * ApiDescriptor.class); mdStoreVersion = OBJECT_MAPPER
+ * .writeValueAsString(prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json"));
+ */
+ }
+
+ @AfterAll
+ protected static void tearDown() {
+ /*
+ * hdfsCluster.shutdown(); FileUtil.fullyDelete(workingDir.toFile());
+ */
+
+ }
+
+ /**
+
+
+ eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication
+ ${collection_java_xmx}
+ --apidescriptor${apiDescription}
+ --namenode${nameNode}
+ --workflowId${workflowId}
+ --dnetMessageManagerURL${dnetMessageManagerURL}
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --maxNumberOfRetry${maxNumberOfRetry}
+ --requestDelay${requestDelay}
+ --retryDelay${retryDelay}
+ --connectTimeOut${connectTimeOut}
+ --readTimeOut${readTimeOut}
+
+
+
+
+
+ */
+ // @Test
+ // @Order(1)
+ public void testCollectorWorkerApplication() throws Exception {
+
+ final HttpClientParams httpClientParams = new HttpClientParams();
+
+ // String url = String.format(msgMgrUrl, wireMockServer.port());
+
+ // new CollectorWorkerApplication(fileSystem).run(mdStoreVersion, httpClientParams, api, url, "1234");
+
+ }
+
+ public static MDStoreVersion prepareVersion(String filename) throws IOException {
+ MDStoreVersion mdstore = OBJECT_MAPPER
+ .readValue(IOUtils.toString(CollectionWorkflowTest.class.getResource(filename)), MDStoreVersion.class);
+ mdstore.setHdfsPath(String.format(mdstore.getHdfsPath(), workingDir.toString()));
+ return mdstore;
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest.java
similarity index 73%
rename from dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java
rename to dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest.java
index ff3ff3b6e0..723f030a6a 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest.java
@@ -1,8 +1,9 @@
-package eu.dnetlib.dhp.aggregation;
+package eu.dnetlib.dhp.collection;
import static eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.io.File;
import java.io.FileOutputStream;
@@ -36,14 +37,14 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
-import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
+import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
+import eu.dnetlib.dhp.model.mdstore.Provenance;
import eu.dnetlib.dhp.transformation.TransformSparkJobNode;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@ExtendWith(MockitoExtension.class)
-public class AggregationJobTest extends AbstractVocabularyTest {
+public class GenerateNativeStoreSparkJobTest extends AbstractVocabularyTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -58,18 +59,20 @@ public class AggregationJobTest extends AbstractVocabularyTest {
private static final String xpath = "//*[local-name()='header']/*[local-name()='identifier']";
private static String provenance;
- private static final Logger log = LoggerFactory.getLogger(AggregationJobTest.class);
+ private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJobTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
provenance = IOUtils
- .toString(AggregationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/collection/provenance.json"));
- workingDir = Files.createTempDirectory(AggregationJobTest.class.getSimpleName());
+ .toString(
+ GenerateNativeStoreSparkJobTest.class
+ .getResourceAsStream("/eu/dnetlib/dhp/collection/provenance.json"));
+ workingDir = Files.createTempDirectory(GenerateNativeStoreSparkJobTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
- conf.setAppName(AggregationJobTest.class.getSimpleName());
+ conf.setAppName(GenerateNativeStoreSparkJobTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
@@ -81,7 +84,7 @@ public class AggregationJobTest extends AbstractVocabularyTest {
encoder = Encoders.bean(MetadataRecord.class);
spark = SparkSession
.builder()
- .appName(AggregationJobTest.class.getSimpleName())
+ .appName(GenerateNativeStoreSparkJobTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@@ -202,6 +205,67 @@ public class AggregationJobTest extends AbstractVocabularyTest {
}
+ @Test
+ public void testJSONSerialization() throws Exception {
+ final String s = IOUtils.toString(getClass().getResourceAsStream("mdStoreVersion_1.json"));
+ System.out.println("s = " + s);
+ final ObjectMapper mapper = new ObjectMapper();
+ MDStoreVersion mi = mapper.readValue(s, MDStoreVersion.class);
+
+ assertNotNull(mi);
+
+ }
+
+ @Test
+ public void testGenerationMetadataRecord() throws Exception {
+
+ final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml"));
+
+ final MetadataRecord record = GenerateNativeStoreSparkJob
+ .parseRecord(
+ xml,
+ "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
+ "XML",
+ new Provenance("foo", "bar", "ns_prefix"),
+ System.currentTimeMillis(),
+ null,
+ null);
+
+ assertNotNull(record.getId());
+ assertNotNull(record.getOriginalId());
+ }
+
+ @Test
+ public void testEquals() throws IOException {
+
+ final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml"));
+ final MetadataRecord record = GenerateNativeStoreSparkJob
+ .parseRecord(
+ xml,
+ "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
+ "XML",
+ new Provenance("foo", "bar", "ns_prefix"),
+ System.currentTimeMillis(),
+ null,
+ null);
+ final MetadataRecord record1 = GenerateNativeStoreSparkJob
+ .parseRecord(
+ xml,
+ "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']",
+ "XML",
+ new Provenance("foo", "bar", "ns_prefix"),
+ System.currentTimeMillis(),
+ null,
+ null);
+
+ record.setBody("ciao");
+ record1.setBody("mondo");
+
+ assertNotNull(record);
+ assertNotNull(record1);
+ assertEquals(record, record1);
+ }
+
protected void verify(MDStoreVersion mdStoreVersion) throws IOException {
Assertions.assertTrue(new File(mdStoreVersion.getHdfsPath()).exists());
@@ -226,7 +290,7 @@ public class AggregationJobTest extends AbstractVocabularyTest {
Assertions.assertEquals(seqFileSize, uniqueIds, "the size must be equal");
}
- private MDStoreVersion prepareVersion(String filename) throws IOException {
+ public MDStoreVersion prepareVersion(String filename) throws IOException {
MDStoreVersion mdstore = OBJECT_MAPPER
.readValue(IOUtils.toString(getClass().getResource(filename)), MDStoreVersion.class);
mdstore.setHdfsPath(String.format(mdstore.getHdfsPath(), workingDir.toString()));
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/CollectorWorkerApplicationTests.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/CollectorWorkerApplicationTests.java
index 975ef944e0..b5ea5f0696 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/CollectorWorkerApplicationTests.java
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/CollectorWorkerApplicationTests.java
@@ -9,20 +9,10 @@ import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.collection.ApiDescriptor;
-import eu.dnetlib.dhp.collection.worker.CollectorPluginFactory;
-import eu.dnetlib.dhp.collection.worker.HttpClientParams;
@Disabled
public class CollectorWorkerApplicationTests {
- @Test
- public void testFindPlugin() throws Exception {
- final CollectorPluginFactory collectorPluginEnumerator = new CollectorPluginFactory();
- final HttpClientParams clientParams = new HttpClientParams();
- assertNotNull(collectorPluginEnumerator.getPluginByProtocol(clientParams, "oai"));
- assertNotNull(collectorPluginEnumerator.getPluginByProtocol(clientParams, "OAI"));
- }
-
@Test
public void testCollectionOAI() throws Exception {
final ApiDescriptor api = new ApiDescriptor();
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/utils/CollectorPluginReportTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/utils/CollectorPluginReportTest.java
index d665e5b5fa..fd90a1b843 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/utils/CollectorPluginReportTest.java
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/utils/CollectorPluginReportTest.java
@@ -8,7 +8,7 @@ import java.io.IOException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import eu.dnetlib.dhp.collection.worker.CollectorPluginReport;
+import eu.dnetlib.dhp.collection.CollectorPluginReport;
public class CollectorPluginReportTest {
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java
index f448503bcc..df71a513b7 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java
@@ -8,8 +8,6 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -42,10 +40,10 @@ public class TransformationJobTest extends AbstractVocabularyTest {
@BeforeAll
public static void beforeAll() throws IOException, ISLookUpException {
-// SparkConf conf = new SparkConf();
-// conf.setAppName(CollectionJobTest.class.getSimpleName());
-// conf.setMaster("local");
-// spark = SparkSession.builder().config(conf).getOrCreate();
+ SparkConf conf = new SparkConf();
+ conf.setAppName(CollectionJobTest.class.getSimpleName());
+ conf.setMaster("local");
+ spark = SparkSession.builder().config(conf).getOrCreate();
}
@BeforeEach
@@ -55,7 +53,7 @@ public class TransformationJobTest extends AbstractVocabularyTest {
@AfterAll
public static void afterAll() {
-// spark.stop();
+ spark.stop();
}
@@ -82,8 +80,12 @@ public class TransformationJobTest extends AbstractVocabularyTest {
// We Load the XSLT transformation Rule from the classpath
XSLTTransformationFunction tr = loadTransformationRule("/eu/dnetlib/dhp/transform/zenodo_tr.xslt");
+
MetadataRecord result = tr.call(mr);
+
+
+
// Print the record
System.out.println(result.getBody());
// TODO Create significant Assert
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/apiDescriptor.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/apiDescriptor.json
new file mode 100644
index 0000000000..99957cac9c
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/apiDescriptor.json
@@ -0,0 +1,10 @@
+{
+ "id":"api_________::opendoar____::2::0",
+ "baseUrl":"https://www.alexandria.unisg.ch/cgi/oai2",
+ "protocol":"oai",
+ "params": {
+ "set":"driver",
+ "metadata_identifier_path":"//*[local-name()\u003d\u0027header\u0027]/*[local-name()\u003d\u0027identifier\u0027]",
+ "format":"oai_dc"
+ }
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/fields.xml b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/fields.xml
index 1f5cf7b815..0352092b22 100644
--- a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/fields.xml
+++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/fields.xml
@@ -79,6 +79,8 @@
+
+
diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/record.xml b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/record.xml
index b617dbea25..a0ca0aa6fe 100644
--- a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/record.xml
+++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/record.xml
@@ -39,6 +39,8 @@
Saykally, Jessica N.
Keeley, Kristen L.
Haris Hatic
+ Baglioni, Miriam
+ De Bonis, Michele
2017-06-01
Withania somnifera has been used in traditional medicine for a variety
of neural disorders. Recently, chronic neurodegenerative conditions have been
@@ -115,7 +117,7 @@
-
+
Cell Transplantation