diff --git a/dhp-applications/dhp-collector-worker/README.md b/dhp-applications/dhp-collector-worker/README.md
index b6609179d..8298a46be 100644
--- a/dhp-applications/dhp-collector-worker/README.md
+++ b/dhp-applications/dhp-collector-worker/README.md
@@ -1,16 +1,30 @@
Description of the Module
--------------------------
-This module defines a **collector worker application** that run on Hadoop.
+This module defines a **collector worker application** that runs on Hadoop.
-It is responsible of harvesting metadata using different plugin, that should be passed as arguments
-in the main class
+It is responsible for harvesting metadata using a different plugin,
+that has been passed as arguments in the main class
+
+The collector worker uses a message queue to inform the progress
+of the harvesting action (using a message queue for sending **ONGOING** messages) furthermore,
+It gives, at the end of the job, some information about the status
+of the collection i.e Number of records collected(using a message queue for sending **REPORT** messages).
+
+To work the collection worker need some parameter like:
+
+* **hdfsPath**: the path where storing the sequential file
+* **apidescriptor**: the JSON encoding of the API Descriptor
+* **namenode**: the Name Node URI
+* **userHDFS**: the user wich create the hdfs seq file
+* **rabbitUser**: the user to connect with RabbitMq for messaging
+* **rabbitPassWord**: the password to connect with RabbitMq for messaging
+* **rabbitHost**: the host of the RabbitMq server
+* **rabbitOngoingQueue**: the name of the ongoing queue
+* **rabbitReportQueue**: the name of the report queue
+* **workflowId**: the identifier of the dnet Workflow
##Plugins
* OAI Plugin
-
-## Api Descriptor
-TODO
-
## Usage
TODO
\ No newline at end of file
diff --git a/dhp-applications/dhp-collector-worker/pom.xml b/dhp-applications/dhp-collector-worker/pom.xml
index fd567aa37..a39eed2b0 100644
--- a/dhp-applications/dhp-collector-worker/pom.xml
+++ b/dhp-applications/dhp-collector-worker/pom.xml
@@ -4,12 +4,13 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- org.springframework.boot
- spring-boot-starter-parent
- 2.1.3.RELEASE
+ eu.dnetlib.dhp
+ dhp-applications
+ 1.0.0-SNAPSHOT
../
+
4.0.0
eu.dnetlib
@@ -18,59 +19,8 @@
-
-
-
-
- dnet45-releases
- D-Net 45 releases
- http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases
- default
-
- false
-
-
- true
-
-
-
- dnet45-bootstrap-release
- dnet45 bootstrap release
- http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-bootstrap-release
- default
-
- false
-
-
- true
-
-
-
-
- cloudera
- Cloudera Repository
- https://repository.cloudera.com/artifactory/cloudera-repos
-
- true
-
-
- false
-
-
-
-
-
-
- org.springframework.boot
- spring-boot-maven-plugin
-
- true
-
-
-
-
org.apache.maven.plugins
maven-compiler-plugin
@@ -129,12 +79,10 @@
-
eu.dnetlib.dhp
dhp-common
- 1.0.0-SNAPSHOT
@@ -142,10 +90,6 @@
commons-cli
1.4
-
- org.springframework.boot
- spring-boot-starter
-
org.apache.hadoop
hadoop-client
@@ -154,26 +98,39 @@
com.fasterxml.jackson.core
jackson-core
+ 2.9.8
com.fasterxml.jackson.core
jackson-annotations
+ 2.9.0
com.fasterxml.jackson.core
jackson-databind
-
-
- dom4j
- dom4j
+ 2.9.8
jaxen
jaxen
+ 1.1.6
- org.springframework.boot
- spring-boot-starter-test
+ dom4j
+ dom4j
+ 1.6.1
+ compile
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.9.8
+ compile
+
+
+ org.mockito
+ mockito-core
+ 3.0.0
test
diff --git a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorker.java b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorker.java
new file mode 100644
index 000000000..ba876c6cf
--- /dev/null
+++ b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorker.java
@@ -0,0 +1,111 @@
+package eu.dnetlib.collector.worker;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.collector.worker.model.ApiDescriptor;
+import eu.dnetlib.collector.worker.plugins.CollectorPlugin;
+import eu.dnetlib.collector.worker.utils.CollectorPluginFactory;
+import eu.dnetlib.message.Message;
+import eu.dnetlib.message.MessageManager;
+import eu.dnetlib.message.MessageType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DnetCollectorWorker {
+
+ private static final Logger log = LoggerFactory.getLogger(DnetCollectorWorker.class);
+
+
+ private final CollectorPluginFactory collectorPluginFactory;
+
+ private final DnetCollectorWorkerArgumentParser argumentParser;
+
+ private final MessageManager manager;
+
+
+ public DnetCollectorWorker(final CollectorPluginFactory collectorPluginFactory, final DnetCollectorWorkerArgumentParser argumentParser, final MessageManager manager) throws DnetCollectorException {
+ this.collectorPluginFactory = collectorPluginFactory;
+ this.argumentParser = argumentParser;
+ this.manager = manager;
+ }
+
+
+ public void collect() throws DnetCollectorException {
+ try {
+ final ObjectMapper jsonMapper = new ObjectMapper();
+ final ApiDescriptor api = jsonMapper.readValue(argumentParser.getJson(), ApiDescriptor.class);
+
+ final CollectorPlugin plugin = collectorPluginFactory.getPluginByProtocol(api.getProtocol());
+
+ final String hdfsuri = argumentParser.getNameNode();
+
+ // ====== Init HDFS File System Object
+ Configuration conf = new Configuration();
+ // Set FileSystem URI
+ conf.set("fs.defaultFS", hdfsuri);
+ // Because of Maven
+ conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
+ conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
+
+ System.setProperty("HADOOP_USER_NAME", argumentParser.getUser());
+ System.setProperty("hadoop.home.dir", "/");
+ //Get the filesystem - HDFS
+ FileSystem fs = FileSystem.get(URI.create(hdfsuri), conf);
+ Path hdfswritepath = new Path(argumentParser.getHdfsPath());
+
+ log.info("Created path " + hdfswritepath.toString());
+
+ final Map ongoingMap = new HashMap<>();
+ final Map reportMap = new HashMap<>();
+ final AtomicInteger counter = new AtomicInteger(0);
+ try (SequenceFile.Writer writer = SequenceFile.createWriter(conf,
+ SequenceFile.Writer.file(hdfswritepath), SequenceFile.Writer.keyClass(IntWritable.class),
+ SequenceFile.Writer.valueClass(Text.class))) {
+ final IntWritable key = new IntWritable(counter.get());
+ final Text value = new Text();
+ plugin.collect(api).forEach(content -> {
+
+ key.set(counter.getAndIncrement());
+ value.set(content);
+ if (counter.get() % 10 == 0) {
+ try {
+ ongoingMap.put("ongoing", "" + counter.get());
+ log.debug("Sending message: "+ manager.sendMessage(new Message(argumentParser.getWorkflowId(), "Collection", MessageType.ONGOING, ongoingMap), argumentParser.getRabbitOngoingQueue(), true, false));
+ } catch (Exception e) {
+ log.error("Error on sending message ", e);
+ }
+ }
+ try {
+ writer.append(key, value);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ });
+ }
+ ongoingMap.put("ongoing", "" + counter.get());
+ manager.sendMessage(new Message(argumentParser.getWorkflowId(), "Collection", MessageType.ONGOING, ongoingMap), argumentParser.getRabbitOngoingQueue(), true, false);
+ reportMap.put("collected", "" + counter.get());
+ manager.sendMessage(new Message(argumentParser.getWorkflowId(), "Collection", MessageType.REPORT, reportMap), argumentParser.getRabbitOngoingQueue(), true, false);
+ manager.close();
+ } catch (Throwable e) {
+ throw new DnetCollectorException("Error on collecting ",e);
+ }
+ }
+
+
+
+
+
+}
diff --git a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java
index 8f4f46513..68c119fa8 100644
--- a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java
+++ b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java
@@ -1,237 +1,43 @@
package eu.dnetlib.collector.worker;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import eu.dnetlib.collector.worker.model.ApiDescriptor;
-import eu.dnetlib.collector.worker.plugins.CollectorPlugin;
-import eu.dnetlib.collector.worker.utils.CollectorPluginEnumerator;
-import eu.dnetlib.message.Message;
+import eu.dnetlib.collector.worker.utils.CollectorPluginFactory;
import eu.dnetlib.message.MessageManager;
-import eu.dnetlib.message.MessageType;
-import org.apache.commons.cli.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
/**
- *
* DnetCollectortWorkerApplication is the main class responsible to start
* the Dnet Collection into HDFS.
* This module will be executed on the hadoop cluster and taking in input some parameters
* that tells it which is the right collector plugin to use and where store the data into HDFS path
*
- *
* @author Sandro La Bruzzo
*/
-@SpringBootApplication
-public class DnetCollectorWorkerApplication implements CommandLineRunner {
- private static final Logger log = LoggerFactory.getLogger(DnetCollectorWorkerApplication.class);
+public class DnetCollectorWorkerApplication {
- @Autowired
- private CollectorPluginEnumerator collectorPluginEnumerator;
+ private static final Logger log = LoggerFactory.getLogger(DnetCollectorWorkerApplication.class);
- /**
- *
- * @param args
- */
- public static void main(final String[] args) {
- SpringApplication.run(DnetCollectorWorkerApplication.class, args);
- }
+ private static CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory();
- /**
- * This module expect two arguments:
- * path hdfs where store the sequential file.
- * Json serialization of {@link ApiDescriptor}
- */
- @Override
- public void run(final String... args) throws Exception {
- Options options = new Options();
- options.addOption(Option.builder("p")
- .longOpt("hdfsPath")
- .required(true)
- .desc("the path where storing the sequential file")
- .hasArg() // This option has an argument.
- .build());
- options.addOption(Option.builder("a")
- .longOpt("apidescriptor")
- .required(true)
- .desc("the Json enconding of the API Descriptor")
- .hasArg() // This option has an argument.
- .build());
-
- options.addOption(Option.builder("n")
- .longOpt("namenode")
- .required(true)
- .desc("the Name Node URI")
- .hasArg() // This option has an argument.
- .build());
-
- options.addOption(Option.builder("u")
- .longOpt("userHDFS")
- .required(true)
- .desc("the user wich create the hdfs seq file")
- .hasArg() // This option has an argument.
- .build());
-
- options.addOption(Option.builder("ru")
- .longOpt("rabbitUser")
- .required(true)
- .desc("the user to connect with RabbitMq for messaging")
- .hasArg() // This option has an argument.
- .build());
-
- options.addOption(Option.builder("rp")
- .longOpt("rabbitPassWord")
- .required(true)
- .desc("the password to connect with RabbitMq for messaging")
- .hasArg() // This option has an argument.
- .build());
-
- options.addOption(Option.builder("rh")
- .longOpt("rabbitHost")
- .required(true)
- .desc("the host of the RabbitMq server")
- .hasArg() // This option has an argument.
- .build());
-
- options.addOption(Option.builder("ro")
- .longOpt("rabbitOngoingQueue")
- .required(true)
- .desc("the name of the ongoing queue")
- .hasArg() // This option has an argument.
- .build());
-
- options.addOption(Option.builder("rr")
- .longOpt("rabbitReportQueue")
- .required(true)
- .desc("the name of the report queue")
- .hasArg() // This option has an argument.
- .build());
+ private static DnetCollectorWorkerArgumentParser argumentParser = new DnetCollectorWorkerArgumentParser();
- options.addOption(Option.builder("w")
- .longOpt("workflowId")
- .required(true)
- .desc("the identifier of the dnet Workflow")
- .hasArg() // This option has an argument.
- .build());
+ /**
+ * @param args
+ */
+ public static void main(final String[] args) throws Exception {
- CommandLineParser parser = new DefaultParser();
- String hdfsPath ;
- String json;
- String nameNode;
- String user;
- String rabbitUser;
- String rabbitPassword;
- String rabbitHost;
- String rabbitOngoingQueue;
- String rabbitReportQueue;
- String workflowId;
-
- try {
- CommandLine cmd = parser.parse(options, args);
- hdfsPath = cmd.getOptionValue("p");
- json = cmd.getOptionValue("a");
- nameNode = cmd.getOptionValue("n");
- user = cmd.getOptionValue("u");
- rabbitUser = cmd.getOptionValue("ru");
- rabbitPassword = cmd.getOptionValue("rp");
- rabbitHost = cmd.getOptionValue("rh");
- rabbitOngoingQueue = cmd.getOptionValue("ro");
- rabbitReportQueue = cmd.getOptionValue("rr");
- workflowId = cmd.getOptionValue("w");
- } catch (ParseException e) {
- System.out.println("Error on executing collector worker, missing parameter:");
- e.printStackTrace();
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp("dhp-collector-worker", options);
- return;
- }
- log.info("hdfsPath ="+hdfsPath);
- log.info("json = "+json);
-
- final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false, false, null);
+ argumentParser.parseArgument(args);
+ log.info("hdfsPath =" + argumentParser.getHdfsPath());
+ log.info("json = " + argumentParser.getJson());
+ final MessageManager manager = new MessageManager(argumentParser.getRabbitHost(), argumentParser.getRabbitUser(), argumentParser.getRabbitPassword(), false, false, null);
+ final DnetCollectorWorker worker = new DnetCollectorWorker(collectorPluginFactory, argumentParser, manager);
+ worker.collect();
- final ObjectMapper jsonMapper = new ObjectMapper();
- final ApiDescriptor api = jsonMapper.readValue(json, ApiDescriptor.class);
+ }
- final CollectorPlugin plugin = collectorPluginEnumerator.getPluginByProtocol(api.getProtocol());
-
- final String hdfsuri =nameNode;
-
- // ====== Init HDFS File System Object
- Configuration conf = new Configuration();
- // Set FileSystem URI
- conf.set("fs.defaultFS", hdfsuri);
- // Because of Maven
- conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
- conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
-
- System.setProperty("HADOOP_USER_NAME", user);
- System.setProperty("hadoop.home.dir", "/");
- //Get the filesystem - HDFS
- FileSystem fs = FileSystem.get(URI.create(hdfsuri), conf);
- Path hdfswritepath = new Path(hdfsPath);
-
- log.info("Created path "+hdfswritepath.toString());
-
- final Map ongoingMap = new HashMap<>();
- final Map reportMap = new HashMap<>();
- final AtomicInteger counter = new AtomicInteger(0);
- try(SequenceFile.Writer writer = SequenceFile.createWriter(conf,
- SequenceFile.Writer.file(hdfswritepath), SequenceFile.Writer.keyClass(IntWritable.class),
- SequenceFile.Writer.valueClass(Text.class))) {
-
-
- final IntWritable key = new IntWritable(counter.get());
- final Text value = new Text();
-
-
-
- plugin.collect(api).forEach(content -> {
-
- key.set(counter.getAndIncrement());
- value.set(content);
- if (counter.get() % 10 ==0) {
- try {
- ongoingMap.put("ongoing", ""+counter.get());
- manager.sendMessage(new Message(workflowId,"Collection", MessageType.ONGOING, ongoingMap ), rabbitOngoingQueue, true, false);
- } catch (Exception e) {
- log.error("Error on sending message ", e);
- }
- }
-
- try {
- writer.append(key, value);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- });
-
- }
- ongoingMap.put("ongoing", ""+counter.get());
- manager.sendMessage(new Message(workflowId,"Collection", MessageType.ONGOING, ongoingMap ), rabbitOngoingQueue, true, false);
- reportMap.put("collected", ""+counter.get());
- manager.sendMessage(new Message(workflowId,"Collection", MessageType.REPORT, reportMap ), rabbitReportQueue, true, false);
- manager.close();
- }
}
diff --git a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerArgumentParser.java b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerArgumentParser.java
new file mode 100644
index 000000000..9f34f898b
--- /dev/null
+++ b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerArgumentParser.java
@@ -0,0 +1,193 @@
+package eu.dnetlib.collector.worker;
+
+import org.apache.commons.cli.*;
+
+public class DnetCollectorWorkerArgumentParser {
+
+ private final Options options;
+ private String hdfsPath ;
+ private String json;
+ private String nameNode;
+ private String user;
+ private String rabbitUser;
+ private String rabbitPassword;
+ private String rabbitHost;
+ private String rabbitOngoingQueue;
+ private String rabbitReportQueue;
+ private String workflowId;
+
+ public DnetCollectorWorkerArgumentParser(){
+ options = new Options();
+ options.addOption(Option.builder("p")
+ .longOpt("hdfsPath")
+ .required(true)
+ .desc("the path where storing the sequential file")
+ .hasArg() // This option has an argument.
+ .build());
+ options.addOption(Option.builder("a")
+ .longOpt("apidescriptor")
+ .required(true)
+ .desc("the Json enconding of the API Descriptor")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("n")
+ .longOpt("namenode")
+ .required(true)
+ .desc("the Name Node URI")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("u")
+ .longOpt("userHDFS")
+ .required(true)
+ .desc("the user wich create the hdfs seq file")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("ru")
+ .longOpt("rabbitUser")
+ .required(true)
+ .desc("the user to connect with RabbitMq for messaging")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("rp")
+ .longOpt("rabbitPassWord")
+ .required(true)
+ .desc("the password to connect with RabbitMq for messaging")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("rh")
+ .longOpt("rabbitHost")
+ .required(true)
+ .desc("the host of the RabbitMq server")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("ro")
+ .longOpt("rabbitOngoingQueue")
+ .required(true)
+ .desc("the name of the ongoing queue")
+ .hasArg() // This option has an argument.
+ .build());
+
+ options.addOption(Option.builder("rr")
+ .longOpt("rabbitReportQueue")
+ .required(true)
+ .desc("the name of the report queue")
+ .hasArg() // This option has an argument.
+ .build());
+ options.addOption(Option.builder("w")
+ .longOpt("workflowId")
+ .required(true)
+ .desc("the identifier of the dnet Workflow")
+ .hasArg() // This option has an argument.
+ .build());
+ }
+
+ public void parseArgument(final String[] args) throws DnetCollectorException {
+ try {
+ CommandLineParser parser = new DefaultParser();
+ CommandLine cmd = parser.parse(options, args);
+ hdfsPath = cmd.getOptionValue("p");
+ json = cmd.getOptionValue("a");
+ nameNode = cmd.getOptionValue("n");
+ user = cmd.getOptionValue("u");
+ rabbitUser = cmd.getOptionValue("ru");
+ rabbitPassword = cmd.getOptionValue("rp");
+ rabbitHost = cmd.getOptionValue("rh");
+ rabbitOngoingQueue = cmd.getOptionValue("ro");
+ rabbitReportQueue = cmd.getOptionValue("rr");
+ workflowId = cmd.getOptionValue("w");
+ } catch (Throwable e){
+ throw new DnetCollectorException("Error during parsing arguments ",e);
+ }
+
+ }
+
+ public Options getOptions() {
+ return options;
+ }
+
+ public String getHdfsPath() {
+ return hdfsPath;
+ }
+
+ public void setHdfsPath(String hdfsPath) {
+ this.hdfsPath = hdfsPath;
+ }
+
+ public String getJson() {
+ return json;
+ }
+
+ public void setJson(String json) {
+ this.json = json;
+ }
+
+ public String getNameNode() {
+ return nameNode;
+ }
+
+ public void setNameNode(String nameNode) {
+ this.nameNode = nameNode;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getRabbitUser() {
+ return rabbitUser;
+ }
+
+ public void setRabbitUser(String rabbitUser) {
+ this.rabbitUser = rabbitUser;
+ }
+
+ public String getRabbitPassword() {
+ return rabbitPassword;
+ }
+
+ public void setRabbitPassword(String rabbitPassword) {
+ this.rabbitPassword = rabbitPassword;
+ }
+
+ public String getRabbitHost() {
+ return rabbitHost;
+ }
+
+ public void setRabbitHost(String rabbitHost) {
+ this.rabbitHost = rabbitHost;
+ }
+
+ public String getRabbitOngoingQueue() {
+ return rabbitOngoingQueue;
+ }
+
+ public void setRabbitOngoingQueue(String rabbitOngoingQueue) {
+ this.rabbitOngoingQueue = rabbitOngoingQueue;
+ }
+
+ public String getRabbitReportQueue() {
+ return rabbitReportQueue;
+ }
+
+ public void setRabbitReportQueue(String rabbitReportQueue) {
+ this.rabbitReportQueue = rabbitReportQueue;
+ }
+
+ public String getWorkflowId() {
+ return workflowId;
+ }
+
+ public void setWorkflowId(String workflowId) {
+ this.workflowId = workflowId;
+ }
+}
diff --git a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiCollectorPlugin.java b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiCollectorPlugin.java
index 7c4bec192..1dea7f627 100644
--- a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiCollectorPlugin.java
+++ b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiCollectorPlugin.java
@@ -7,10 +7,6 @@ import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
import com.google.common.base.Splitter;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
@@ -18,10 +14,8 @@ import com.google.common.collect.Lists;
import eu.dnetlib.collector.worker.DnetCollectorException;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.collector.worker.plugins.CollectorPlugin;
-import eu.dnetlib.collector.worker.utils.DnetWorkerCollector;
-@Component
-@DnetWorkerCollector("oai")
+
public class OaiCollectorPlugin implements CollectorPlugin {
private static final String FORMAT_PARAM = "format";
@@ -29,7 +23,7 @@ public class OaiCollectorPlugin implements CollectorPlugin {
private static final Object OAI_FROM_DATE_PARAM = "fromDate";
private static final Object OAI_UNTIL_DATE_PARAM = "untilDate";
- @Autowired
+
private OaiIteratorFactory oaiIteratorFactory;
@Override
@@ -58,9 +52,16 @@ public class OaiCollectorPlugin implements CollectorPlugin {
if (untilDate != null && !untilDate.matches("\\d{4}-\\d{2}-\\d{2}")) { throw new DnetCollectorException("Invalid date (YYYY-MM-DD): " + untilDate); }
final Iterator> iters = sets.stream()
- .map(set -> oaiIteratorFactory.newIterator(baseUrl, mdFormat, set, fromDate, untilDate))
+ .map(set -> getOaiIteratorFactory().newIterator(baseUrl, mdFormat, set, fromDate, untilDate))
.iterator();
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(Iterators.concat(iters), Spliterator.ORDERED), false);
}
+
+ public OaiIteratorFactory getOaiIteratorFactory() {
+ if (oaiIteratorFactory == null){
+ oaiIteratorFactory = new OaiIteratorFactory();
+ }
+ return oaiIteratorFactory;
+ }
}
diff --git a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiIteratorFactory.java b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiIteratorFactory.java
index 16af15191..ef7ce222f 100644
--- a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiIteratorFactory.java
+++ b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/plugins/oai/OaiIteratorFactory.java
@@ -1,20 +1,24 @@
package eu.dnetlib.collector.worker.plugins.oai;
import java.util.Iterator;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
import eu.dnetlib.collector.worker.utils.HttpConnector;
-@Component
+
public class OaiIteratorFactory {
- @Autowired
+
private HttpConnector httpConnector;
public Iterator newIterator(final String baseUrl, final String mdFormat, final String set, final String fromDate, final String untilDate) {
- return new OaiIterator(baseUrl, mdFormat, set, fromDate, untilDate, httpConnector);
+ return new OaiIterator(baseUrl, mdFormat, set, fromDate, untilDate, getHttpConnector());
}
+ private HttpConnector getHttpConnector() {
+ if (httpConnector== null)
+ httpConnector = new HttpConnector();
+ return httpConnector;
+ }
+
+
+
}
diff --git a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/CollectorPluginEnumerator.java b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/CollectorPluginEnumerator.java
deleted file mode 100644
index 3f5b245ef..000000000
--- a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/CollectorPluginEnumerator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package eu.dnetlib.collector.worker.utils;
-
-import java.util.List;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import eu.dnetlib.collector.worker.plugins.CollectorPlugin;
-
-@Component
-public class CollectorPluginEnumerator {
-
- @Autowired
- private List plugins;
-
- public CollectorPlugin getPluginByProtocol(final String protocol) {
- return plugins.stream()
- .filter(p -> p.getClass().isAnnotationPresent(DnetWorkerCollector.class))
- .filter(p -> p.getClass().getAnnotation(DnetWorkerCollector.class).value().equalsIgnoreCase(protocol))
- .findFirst()
- .get();
- }
-
-}
diff --git a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/CollectorPluginFactory.java b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/CollectorPluginFactory.java
new file mode 100644
index 000000000..4c5561778
--- /dev/null
+++ b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/CollectorPluginFactory.java
@@ -0,0 +1,20 @@
+package eu.dnetlib.collector.worker.utils;
+
+import eu.dnetlib.collector.worker.DnetCollectorException;
+import eu.dnetlib.collector.worker.plugins.CollectorPlugin;
+import eu.dnetlib.collector.worker.plugins.oai.OaiCollectorPlugin;
+
+
+public class CollectorPluginFactory {
+
+ public CollectorPlugin getPluginByProtocol(final String protocol) throws DnetCollectorException {
+ if (protocol==null) throw new DnetCollectorException("protocol cannot be null");
+ switch (protocol.toLowerCase().trim()){
+ case "oai":
+ return new OaiCollectorPlugin();
+ default:
+ throw new DnetCollectorException("UNknown protocol");
+ }
+
+ }
+}
diff --git a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/DnetWorkerCollector.java b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/DnetWorkerCollector.java
deleted file mode 100644
index 28891c84f..000000000
--- a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/DnetWorkerCollector.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package eu.dnetlib.collector.worker.utils;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.TYPE)
-public @interface DnetWorkerCollector {
-
- String value();
-
-}
diff --git a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/HttpConnector.java b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/HttpConnector.java
index 8a24381e5..73a416e18 100644
--- a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/HttpConnector.java
+++ b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/utils/HttpConnector.java
@@ -21,11 +21,11 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.springframework.stereotype.Component;
+
import eu.dnetlib.collector.worker.DnetCollectorException;
-@Component
+
public class HttpConnector {
private static final Log log = LogFactory.getLog(HttpConnector.class);
@@ -48,7 +48,7 @@ public class HttpConnector {
* @param requestUrl
* the URL
* @return the content of the downloaded resource
- * @throws CollectorServiceException
+ * @throws DnetCollectorException
* when retrying more than maxNumberOfRetry times
*/
public String getInputSource(final String requestUrl) throws DnetCollectorException {
@@ -61,7 +61,7 @@ public class HttpConnector {
* @param requestUrl
* the URL
* @return the content of the downloaded resource as InputStream
- * @throws CollectorServiceException
+ * @throws DnetCollectorException
* when retrying more than maxNumberOfRetry times
*/
public InputStream getInputSourceAsStream(final String requestUrl) throws DnetCollectorException {
diff --git a/dhp-applications/dhp-collector-worker/src/test/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplicationTests.java b/dhp-applications/dhp-collector-worker/src/test/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplicationTests.java
index 24f78f318..e0b7f0025 100644
--- a/dhp-applications/dhp-collector-worker/src/test/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplicationTests.java
+++ b/dhp-applications/dhp-collector-worker/src/test/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplicationTests.java
@@ -1,45 +1,88 @@
package eu.dnetlib.collector.worker;
-import static org.junit.Assert.assertNotNull;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.context.ApplicationContext;
-import org.springframework.test.context.junit4.SpringRunner;
-
import com.fasterxml.jackson.databind.ObjectMapper;
-
import eu.dnetlib.collector.worker.model.ApiDescriptor;
-import eu.dnetlib.collector.worker.utils.CollectorPluginEnumerator;
+import eu.dnetlib.collector.worker.utils.CollectorPluginFactory;
+import eu.dnetlib.message.Message;
+import eu.dnetlib.message.MessageManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.*;
+
-@RunWith(SpringRunner.class)
-@SpringBootTest
public class DnetCollectorWorkerApplicationTests {
- @Autowired
- private ApplicationContext ctx;
- @Test
- public void testFindPlugin() throws Exception {
- final CollectorPluginEnumerator collectorPluginEnumerator = ctx.getBean(CollectorPluginEnumerator.class);
- assertNotNull(collectorPluginEnumerator.getPluginByProtocol("oai"));
- assertNotNull(collectorPluginEnumerator.getPluginByProtocol("OAI"));
- }
+ private DnetCollectorWorkerArgumentParser argumentParser = mock(DnetCollectorWorkerArgumentParser.class);
+ private MessageManager messageManager = mock(MessageManager.class);
- @Test
- public void testCollectionOAI() throws Exception {
- final ApiDescriptor api = new ApiDescriptor();
- api.setId("oai");
- api.setProtocol("oai");
- api.setBaseUrl("http://www.revista.vocesdelaeducacion.com.mx/index.php/index/oai");
- api.getParams().put("format", "oai_dc");
+ private DnetCollectorWorker worker;
+ @Before
+ public void setup() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ final String apiJson = mapper.writeValueAsString(getApi());
+ when(argumentParser.getJson()).thenReturn(apiJson);
+ when(argumentParser.getNameNode()).thenReturn("file://tmp/test.seq");
+ when(argumentParser.getHdfsPath()).thenReturn("/tmp/file.seq");
+ when(argumentParser.getUser()).thenReturn("sandro");
+ when(argumentParser.getWorkflowId()).thenReturn("sandro");
+ when(argumentParser.getRabbitOngoingQueue()).thenReturn("sandro");
- ObjectMapper mapper = new ObjectMapper();
+ when(messageManager.sendMessage(any(Message.class), anyString(), anyBoolean(),anyBoolean())).thenAnswer(a -> {
+ System.out.println("sent message: "+a.getArguments()[0]);
+ return true;
+ });
+ when(messageManager.sendMessage(any(Message.class), anyString())).thenAnswer(a -> {
+ System.out.println("Called");
+ return true;
+ });
+ worker = new DnetCollectorWorker(new CollectorPluginFactory(), argumentParser, messageManager);
+ }
- System.out.println(mapper.writeValueAsString(api));
- }
+
+ @After
+ public void dropDown(){
+ File f = new File("/tmp/test.seq");
+ f.delete();
+ }
+
+
+ @Test
+ public void testFindPlugin() throws Exception {
+ final CollectorPluginFactory collectorPluginEnumerator = new CollectorPluginFactory();
+ assertNotNull(collectorPluginEnumerator.getPluginByProtocol("oai"));
+ assertNotNull(collectorPluginEnumerator.getPluginByProtocol("OAI"));
+ }
+
+
+ @Test
+ public void testCollectionOAI() throws Exception {
+ final ApiDescriptor api = new ApiDescriptor();
+ api.setId("oai");
+ api.setProtocol("oai");
+ api.setBaseUrl("http://www.revista.vocesdelaeducacion.com.mx/index.php/index/oai");
+ api.getParams().put("format", "oai_dc");
+ ObjectMapper mapper = new ObjectMapper();
+ assertNotNull(mapper.writeValueAsString(api));
+ }
+
+ @Test
+ public void testFeeding() throws Exception {
+ worker.collect();
+ }
+
+ private ApiDescriptor getApi() {
+ final ApiDescriptor api = new ApiDescriptor();
+ api.setId("oai");
+ api.setProtocol("oai");
+ api.setBaseUrl("http://www.revista.vocesdelaeducacion.com.mx/index.php/index/oai");
+ api.getParams().put("format", "oai_dc");
+ return api;
+ }
}
diff --git a/dhp-applications/dhp-collector-worker/src/test/resources/log4j.properties b/dhp-applications/dhp-collector-worker/src/test/resources/log4j.properties
new file mode 100644
index 000000000..fd3cc24e1
--- /dev/null
+++ b/dhp-applications/dhp-collector-worker/src/test/resources/log4j.properties
@@ -0,0 +1,14 @@
+### Root Level ###
+log4j.rootLogger=WARN, CONSOLE
+
+### Configuration for the CONSOLE appender ###
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c - %m%n
+
+org.apache.cxf.Logger=org.apache.cxf.common.logging.Log4jLogger
+
+### Application Level ###
+log4j.logger.eu.dnetlib=INFO
+log4j.logger.eu.dnetlib.collector.worker.DnetCollectorWorker=DEBUG
+