diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index 6eb2e0358..a8607a9b3 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -53,11 +53,6 @@
com.fasterxml.jackson.core
jackson-databind
-
-
- com.rabbitmq
- amqp-client
-
net.sf.saxon
Saxon-HE
diff --git a/dhp-common/src/main/java/eu/dnetlib/message/Message.java b/dhp-common/src/main/java/eu/dnetlib/message/Message.java
deleted file mode 100644
index fc1c38291..000000000
--- a/dhp-common/src/main/java/eu/dnetlib/message/Message.java
+++ /dev/null
@@ -1,76 +0,0 @@
-
-package eu.dnetlib.message;
-
-import java.io.IOException;
-import java.util.Map;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-public class Message {
-
- private String workflowId;
-
- private String jobName;
-
- private MessageType type;
-
- private Map body;
-
- public static Message fromJson(final String json) throws IOException {
- final ObjectMapper jsonMapper = new ObjectMapper();
- return jsonMapper.readValue(json, Message.class);
- }
-
- public Message() {
- }
-
- public Message(String workflowId, String jobName, MessageType type, Map body) {
- this.workflowId = workflowId;
- this.jobName = jobName;
- this.type = type;
- this.body = body;
- }
-
- public String getWorkflowId() {
- return workflowId;
- }
-
- public void setWorkflowId(String workflowId) {
- this.workflowId = workflowId;
- }
-
- public String getJobName() {
- return jobName;
- }
-
- public void setJobName(String jobName) {
- this.jobName = jobName;
- }
-
- public MessageType getType() {
- return type;
- }
-
- public void setType(MessageType type) {
- this.type = type;
- }
-
- public Map getBody() {
- return body;
- }
-
- public void setBody(Map body) {
- this.body = body;
- }
-
- @Override
- public String toString() {
- final ObjectMapper jsonMapper = new ObjectMapper();
- try {
- return jsonMapper.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- return null;
- }
- }
-}
diff --git a/dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java b/dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java
deleted file mode 100644
index fb3f0bd95..000000000
--- a/dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-
-package eu.dnetlib.message;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.DefaultConsumer;
-import com.rabbitmq.client.Envelope;
-
-public class MessageConsumer extends DefaultConsumer {
-
- final LinkedBlockingQueue queueMessages;
-
- /**
- * Constructs a new instance and records its association to the passed-in channel.
- *
- * @param channel the channel to which this consumer is attached
- * @param queueMessages
- */
- public MessageConsumer(Channel channel, LinkedBlockingQueue queueMessages) {
- super(channel);
- this.queueMessages = queueMessages;
- }
-
- @Override
- public void handleDelivery(
- String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
- throws IOException {
- final String json = new String(body, StandardCharsets.UTF_8);
- Message message = Message.fromJson(json);
- try {
- this.queueMessages.put(message);
- System.out.println("Receiving Message " + message);
- } catch (InterruptedException e) {
- if (message.getType() == MessageType.REPORT)
- throw new RuntimeException("Error on sending message");
- else {
- // TODO LOGGING EXCEPTION
- }
- } finally {
- getChannel().basicAck(envelope.getDeliveryTag(), false);
- }
- }
-}
diff --git a/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java b/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java
deleted file mode 100644
index 5ca79f3cc..000000000
--- a/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java
+++ /dev/null
@@ -1,136 +0,0 @@
-
-package eu.dnetlib.message;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeoutException;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-
-public class MessageManager {
-
- private final String messageHost;
-
- private final String username;
-
- private final String password;
-
- private Connection connection;
-
- private final Map channels = new HashMap<>();
-
- private boolean durable;
-
- private boolean autodelete;
-
- private final LinkedBlockingQueue queueMessages;
-
- public MessageManager(
- String messageHost,
- String username,
- String password,
- final LinkedBlockingQueue queueMessages) {
- this.queueMessages = queueMessages;
- this.messageHost = messageHost;
- this.username = username;
- this.password = password;
- }
-
- public MessageManager(
- String messageHost,
- String username,
- String password,
- boolean durable,
- boolean autodelete,
- final LinkedBlockingQueue queueMessages) {
- this.queueMessages = queueMessages;
- this.messageHost = messageHost;
- this.username = username;
- this.password = password;
-
- this.durable = durable;
- this.autodelete = autodelete;
- }
-
- private Connection createConnection() throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(this.messageHost);
- factory.setUsername(this.username);
- factory.setPassword(this.password);
- return factory.newConnection();
- }
-
- private Channel createChannel(
- final Connection connection,
- final String queueName,
- final boolean durable,
- final boolean autodelete)
- throws Exception {
- Map args = new HashMap<>();
- args.put("x-message-ttl", 10000);
- Channel channel = connection.createChannel();
- channel.queueDeclare(queueName, durable, false, this.autodelete, args);
- return channel;
- }
-
- private Channel getOrCreateChannel(final String queueName, boolean durable, boolean autodelete)
- throws Exception {
- if (channels.containsKey(queueName)) {
- return channels.get(queueName);
- }
-
- if (this.connection == null) {
- this.connection = createConnection();
- }
- channels.put(queueName, createChannel(this.connection, queueName, durable, autodelete));
- return channels.get(queueName);
- }
-
- public void close() throws IOException {
- channels
- .values()
- .forEach(
- ch -> {
- try {
- ch.close();
- } catch (Exception e) {
- // TODO LOG
- }
- });
-
- this.connection.close();
- }
-
- public boolean sendMessage(final Message message, String queueName) throws Exception {
- try {
- Channel channel = getOrCreateChannel(queueName, this.durable, this.autodelete);
- channel.basicPublish("", queueName, null, message.toString().getBytes());
- return true;
- } catch (Throwable e) {
- throw new RuntimeException(e);
- }
- }
-
- public boolean sendMessage(
- final Message message, String queueName, boolean durable_var, boolean autodelete_var)
- throws Exception {
- try {
- Channel channel = getOrCreateChannel(queueName, durable_var, autodelete_var);
- channel.basicPublish("", queueName, null, message.toString().getBytes());
- return true;
- } catch (Throwable e) {
- throw new RuntimeException(e);
- }
- }
-
- public void startConsumingMessage(
- final String queueName, final boolean durable, final boolean autodelete) throws Exception {
-
- Channel channel = createChannel(createConnection(), queueName, durable, autodelete);
- channel.basicConsume(queueName, false, new MessageConsumer(channel, queueMessages));
- }
-}
diff --git a/dhp-common/src/main/java/eu/dnetlib/message/MessageType.java b/dhp-common/src/main/java/eu/dnetlib/message/MessageType.java
deleted file mode 100644
index 72cbda252..000000000
--- a/dhp-common/src/main/java/eu/dnetlib/message/MessageType.java
+++ /dev/null
@@ -1,6 +0,0 @@
-
-package eu.dnetlib.message;
-
-public enum MessageType {
- ONGOING, REPORT
-}
diff --git a/dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java b/dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java
deleted file mode 100644
index 442f7b5c2..000000000
--- a/dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-
-package eu.dnetlib.message;
-
-import static org.junit.jupiter.api.Assertions.*;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.jupiter.api.Test;
-
-public class MessageTest {
-
- @Test
- public void fromJsonTest() throws IOException {
- Message m = new Message();
- m.setWorkflowId("wId");
- m.setType(MessageType.ONGOING);
- m.setJobName("Collection");
- Map body = new HashMap<>();
- body.put("parsedItem", "300");
- body.put("ExecutionTime", "30s");
-
- m.setBody(body);
- System.out.println("m = " + m);
- Message m1 = Message.fromJson(m.toString());
- assertEquals(m1.getWorkflowId(), m.getWorkflowId());
- assertEquals(m1.getType(), m.getType());
- assertEquals(m1.getJobName(), m.getJobName());
-
- assertNotNull(m1.getBody());
- m1.getBody().keySet().forEach(it -> assertEquals(m1.getBody().get(it), m.getBody().get(it)));
- assertEquals(m1.getJobName(), m.getJobName());
- }
-
- @Test
- public void toStringTest() {
- final String expectedJson = "{\"workflowId\":\"wId\",\"jobName\":\"Collection\",\"type\":\"ONGOING\",\"body\":{\"ExecutionTime\":\"30s\",\"parsedItem\":\"300\"}}";
- Message m = new Message();
- m.setWorkflowId("wId");
- m.setType(MessageType.ONGOING);
- m.setJobName("Collection");
- Map body = new HashMap<>();
- body.put("parsedItem", "300");
- body.put("ExecutionTime", "30s");
-
- m.setBody(body);
-
- assertEquals(expectedJson, m.toString());
- }
-}
diff --git a/pom.xml b/pom.xml
index 3e0626aed..cfe1edfbd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -374,11 +374,6 @@
provided
-
- com.rabbitmq
- amqp-client
- 5.6.0
-
com.jayway.jsonpath
json-path