diff --git a/dhp-applications/dhp-collector-worker/pom.xml b/dhp-applications/dhp-collector-worker/pom.xml
index af8291651..8dec778e6 100644
--- a/dhp-applications/dhp-collector-worker/pom.xml
+++ b/dhp-applications/dhp-collector-worker/pom.xml
@@ -67,6 +67,13 @@
jaxen
jaxen
+
+
+ com.rabbitmq
+ amqp-client
+ 5.6.0
+
+
org.springframework.boot
spring-boot-starter-test
diff --git a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java
index c2854b8f2..d654a30e8 100644
--- a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java
+++ b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java
@@ -57,12 +57,8 @@ public class DnetCollectorWorkerApplication implements CommandLineRunner {
public void run(final String... args) throws Exception {
if (args.length == 0) { return; }
if (args.length != 3) { throw new DnetCollectorException("Invalid number of parameters, expected: hdfs_path and json_api_description"); }
-
//TODO : migrate to https://commons.apache.org/proper/commons-cli/usage.html
-
-
-
final String hdfsPath = args[0];
log.info("hdfsPath ="+hdfsPath);
diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index 2de3dc310..cac0f08c2 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -29,6 +29,21 @@
2.2
provided
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.6.5
+ provided
+
+
+
+
+
+ com.rabbitmq
+ amqp-client
+ 5.6.0
+
diff --git a/dhp-common/src/main/java/eu/dnetlib/message/Message.java b/dhp-common/src/main/java/eu/dnetlib/message/Message.java
new file mode 100644
index 000000000..3767bd026
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/message/Message.java
@@ -0,0 +1,82 @@
+package eu.dnetlib.message;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class Message {
+
+ private String workflowId;
+
+ private String jobName;
+
+ private MessageType type;
+
+ private Map body;
+
+
+ public static Message fromJson(final String json) throws IOException {
+ final ObjectMapper jsonMapper = new ObjectMapper();
+ return jsonMapper.readValue(json, Message.class);
+
+
+ }
+
+
+ public Message() {
+
+
+
+ }
+
+ public Message(String workflowId, String jobName, MessageType type, Map body) {
+ this.workflowId = workflowId;
+ this.jobName = jobName;
+ this.type = type;
+ this.body = body;
+ }
+
+ public String getWorkflowId() {
+ return workflowId;
+ }
+
+ public void setWorkflowId(String workflowId) {
+ this.workflowId = workflowId;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ public MessageType getType() {
+ return type;
+ }
+
+ public void setType(MessageType type) {
+ this.type = type;
+ }
+
+ public Map getBody() {
+ return body;
+ }
+
+ public void setBody(Map body) {
+ this.body = body;
+ }
+
+ @Override
+ public String toString() {
+ final ObjectMapper jsonMapper = new ObjectMapper();
+ try {
+ return jsonMapper.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ return null;
+ }
+ }
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java b/dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java
new file mode 100644
index 000000000..ee58f11e8
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java
@@ -0,0 +1,44 @@
+package eu.dnetlib.message;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class MessageConsumer extends DefaultConsumer {
+
+ final LinkedBlockingQueue queueMessages;
+
+
+ /**
+ * Constructs a new instance and records its association to the passed-in channel.
+ *
+ * @param channel the channel to which this consumer is attached
+ * @param queueMessages
+ */
+ public MessageConsumer(Channel channel, LinkedBlockingQueue queueMessages) {
+ super(channel);
+ this.queueMessages = queueMessages;
+ }
+
+
+ @Override
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+ final String json = new String(body, StandardCharsets.UTF_8);
+ Message message = Message.fromJson(json);
+ try {
+ this.queueMessages.put(message);
+ System.out.println("Receiving Message "+message);
+ } catch (InterruptedException e) {
+ if (message.getType()== MessageType.REPORT)
+ throw new RuntimeException("Error on sending message");
+ else {
+ //TODO LOGGING EXCEPTION
+ }
+ }
+ }
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java b/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java
new file mode 100644
index 000000000..8db9f3ce1
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java
@@ -0,0 +1,82 @@
+package eu.dnetlib.message;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import sun.rmi.runtime.Log;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class MessageManager {
+
+ private final String messageHost;
+
+ private final String username;
+
+ private final String password;
+
+
+ private boolean durable;
+
+ private boolean autodelete;
+
+ final private LinkedBlockingQueue queueMessages;
+
+ public MessageManager(String messageHost, String username, String password, final LinkedBlockingQueue queueMessages) {
+ this.queueMessages = queueMessages;
+ this.messageHost = messageHost;
+ this.username = username;
+ this.password = password;
+ }
+
+
+ public MessageManager(String messageHost, String username, String password, boolean durable, boolean autodelete, final LinkedBlockingQueue queueMessages) {
+ this.queueMessages = queueMessages;
+ this.messageHost = messageHost;
+ this.username = username;
+ this.password = password;
+
+ this.durable = durable;
+ this.autodelete = autodelete;
+ }
+ private Channel createChannel(final String queueName, final boolean durable, final boolean autodelete ) throws Exception {
+ ConnectionFactory factory = new ConnectionFactory();
+ factory.setHost(this.messageHost);
+ factory.setUsername(this.username);
+ factory.setPassword(this.password);
+ Connection connection = factory.newConnection();
+ Map args = new HashMap();
+ args.put("x-message-ttl", 10000);
+ Channel channel = connection.createChannel();
+ channel.queueDeclare(queueName, durable, false, this.autodelete, args);
+ return channel;
+ }
+ public boolean sendMessage(final Message message, String queueName) throws Exception {
+ try (Channel channel = createChannel(queueName, this.durable, this.autodelete)) {
+
+ channel.basicPublish("", queueName,null, message.toString().getBytes());
+ return true;
+ } catch (Throwable e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ public boolean sendMessage(final Message message, String queueName, boolean durable_var, boolean autodelete_var) throws Exception {
+ try (Channel channel = createChannel(queueName, durable_var, autodelete_var)) {
+
+ channel.basicPublish("", queueName,null, message.toString().getBytes());
+ return true;
+ } catch (Throwable e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ public void startConsumingMessage(final String queueName, final boolean durable, final boolean autodelete) throws Exception{
+ Channel channel = createChannel(queueName, durable, autodelete);
+ channel.basicConsume(queueName, false, new MessageConsumer(channel,queueMessages));
+ }
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/message/MessageType.java b/dhp-common/src/main/java/eu/dnetlib/message/MessageType.java
new file mode 100644
index 000000000..c2440c3fe
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/message/MessageType.java
@@ -0,0 +1,8 @@
+package eu.dnetlib.message;
+
+public enum MessageType {
+
+ ONGOING,
+ REPORT
+
+}
diff --git a/dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java b/dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java
new file mode 100644
index 000000000..baed36304
--- /dev/null
+++ b/dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java
@@ -0,0 +1,83 @@
+package eu.dnetlib.message;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+public class MessageTest {
+
+ @Test
+ public void fromJsonTest() throws IOException {
+ Message m = new Message();
+ m.setWorkflowId("wId");
+ m.setType(MessageType.ONGOING);
+ m.setJobName("Collection");
+ Map body= new HashMap<>();
+ body.put("parsedItem", "300");
+ body.put("ExecutionTime", "30s");
+
+ m.setBody(body);
+ System.out.println("m = " + m);
+ Message m1 = Message.fromJson(m.toString());
+ assertEquals(m1.getWorkflowId(), m.getWorkflowId());
+ assertEquals(m1.getType(), m.getType());
+ assertEquals(m1.getJobName(), m.getJobName());
+
+ assertNotNull(m1.getBody());
+ m1.getBody().keySet().forEach(it -> assertEquals(m1.getBody().get(it), m.getBody().get(it)));
+ assertEquals(m1.getJobName(), m.getJobName());
+ }
+
+ @Test
+ public void toStringTest() {
+ final String expectedJson= "{\"workflowId\":\"wId\",\"jobName\":\"Collection\",\"type\":\"ONGOING\",\"body\":{\"ExecutionTime\":\"30s\",\"parsedItem\":\"300\"}}";
+ Message m = new Message();
+ m.setWorkflowId("wId");
+ m.setType(MessageType.ONGOING);
+ m.setJobName("Collection");
+ Map body= new HashMap<>();
+ body.put("parsedItem", "300");
+ body.put("ExecutionTime", "30s");
+
+ m.setBody(body);
+
+ assertEquals(expectedJson,m.toString());
+
+
+ }
+
+
+ @Test
+ public void sendMessageTest() throws Exception {
+
+ final String expectedJson= "{\"workflowId\":\"wId\",\"jobName\":\"Collection\",\"type\":\"ONGOING\",\"body\":{\"ExecutionTime\":\"30s\",\"parsedItem\":\"300\"}}";
+ Message m = new Message();
+ m.setWorkflowId("wf_20190405_105048_275");
+ m.setType(MessageType.ONGOING);
+ m.setJobName("Collection");
+ Map body= new HashMap<>();
+ body.put("progressCount", "100");
+ body.put("ExecutionTime", "30s");
+
+ m.setBody(body);
+
+ MessageManager mm = new MessageManager("broker1-dev-dnet.d4science.org","r_admin", "9g8fed7gpohef9y84th98h", false,false, null);
+
+
+
+
+
+ mm.sendMessage(m, "dev_ongoing");
+
+ m.setType(MessageType.REPORT);
+
+ body.put("mdStoreSize", "368");
+
+
+ mm.sendMessage(m, "dev_report", true, false);
+ }
+}
\ No newline at end of file