From d62ea1490d494393a730cf10ec61d592ca21e4a5 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 2 Feb 2021 10:53:19 +0100 Subject: [PATCH] cleaned up RabbitMQ stuff --- dhp-common/pom.xml | 5 - .../main/java/eu/dnetlib/message/Message.java | 76 ---------- .../eu/dnetlib/message/MessageConsumer.java | 47 ------ .../eu/dnetlib/message/MessageManager.java | 136 ------------------ .../java/eu/dnetlib/message/MessageType.java | 6 - .../java/eu/dnetlib/message/MessageTest.java | 51 ------- pom.xml | 5 - 7 files changed, 326 deletions(-) delete mode 100644 dhp-common/src/main/java/eu/dnetlib/message/Message.java delete mode 100644 dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java delete mode 100644 dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java delete mode 100644 dhp-common/src/main/java/eu/dnetlib/message/MessageType.java delete mode 100644 dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml index 6eb2e0358..a8607a9b3 100644 --- a/dhp-common/pom.xml +++ b/dhp-common/pom.xml @@ -53,11 +53,6 @@ com.fasterxml.jackson.core jackson-databind - - - com.rabbitmq - amqp-client - net.sf.saxon Saxon-HE diff --git a/dhp-common/src/main/java/eu/dnetlib/message/Message.java b/dhp-common/src/main/java/eu/dnetlib/message/Message.java deleted file mode 100644 index fc1c38291..000000000 --- a/dhp-common/src/main/java/eu/dnetlib/message/Message.java +++ /dev/null @@ -1,76 +0,0 @@ - -package eu.dnetlib.message; - -import java.io.IOException; -import java.util.Map; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - -public class Message { - - private String workflowId; - - private String jobName; - - private MessageType type; - - private Map body; - - public static Message fromJson(final String json) throws IOException { - final ObjectMapper jsonMapper = new ObjectMapper(); - return jsonMapper.readValue(json, Message.class); - } - - public Message() { - } - - public Message(String workflowId, String jobName, MessageType type, Map body) { - this.workflowId = workflowId; - this.jobName = jobName; - this.type = type; - this.body = body; - } - - public String getWorkflowId() { - return workflowId; - } - - public void setWorkflowId(String workflowId) { - this.workflowId = workflowId; - } - - public String getJobName() { - return jobName; - } - - public void setJobName(String jobName) { - this.jobName = jobName; - } - - public MessageType getType() { - return type; - } - - public void setType(MessageType type) { - this.type = type; - } - - public Map getBody() { - return body; - } - - public void setBody(Map body) { - this.body = body; - } - - @Override - public String toString() { - final ObjectMapper jsonMapper = new ObjectMapper(); - try { - return jsonMapper.writeValueAsString(this); - } catch (JsonProcessingException e) { - return null; - } - } -} diff --git a/dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java b/dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java deleted file mode 100644 index fb3f0bd95..000000000 --- a/dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java +++ /dev/null @@ -1,47 +0,0 @@ - -package eu.dnetlib.message; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.LinkedBlockingQueue; - -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.DefaultConsumer; -import com.rabbitmq.client.Envelope; - -public class MessageConsumer extends DefaultConsumer { - - final LinkedBlockingQueue queueMessages; - - /** - * Constructs a new instance and records its association to the passed-in channel. - * - * @param channel the channel to which this consumer is attached - * @param queueMessages - */ - public MessageConsumer(Channel channel, LinkedBlockingQueue queueMessages) { - super(channel); - this.queueMessages = queueMessages; - } - - @Override - public void handleDelivery( - String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) - throws IOException { - final String json = new String(body, StandardCharsets.UTF_8); - Message message = Message.fromJson(json); - try { - this.queueMessages.put(message); - System.out.println("Receiving Message " + message); - } catch (InterruptedException e) { - if (message.getType() == MessageType.REPORT) - throw new RuntimeException("Error on sending message"); - else { - // TODO LOGGING EXCEPTION - } - } finally { - getChannel().basicAck(envelope.getDeliveryTag(), false); - } - } -} diff --git a/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java b/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java deleted file mode 100644 index 5ca79f3cc..000000000 --- a/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java +++ /dev/null @@ -1,136 +0,0 @@ - -package eu.dnetlib.message; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeoutException; - -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; - -public class MessageManager { - - private final String messageHost; - - private final String username; - - private final String password; - - private Connection connection; - - private final Map channels = new HashMap<>(); - - private boolean durable; - - private boolean autodelete; - - private final LinkedBlockingQueue queueMessages; - - public MessageManager( - String messageHost, - String username, - String password, - final LinkedBlockingQueue queueMessages) { - this.queueMessages = queueMessages; - this.messageHost = messageHost; - this.username = username; - this.password = password; - } - - public MessageManager( - String messageHost, - String username, - String password, - boolean durable, - boolean autodelete, - final LinkedBlockingQueue queueMessages) { - this.queueMessages = queueMessages; - this.messageHost = messageHost; - this.username = username; - this.password = password; - - this.durable = durable; - this.autodelete = autodelete; - } - - private Connection createConnection() throws IOException, TimeoutException { - ConnectionFactory factory = new ConnectionFactory(); - factory.setHost(this.messageHost); - factory.setUsername(this.username); - factory.setPassword(this.password); - return factory.newConnection(); - } - - private Channel createChannel( - final Connection connection, - final String queueName, - final boolean durable, - final boolean autodelete) - throws Exception { - Map args = new HashMap<>(); - args.put("x-message-ttl", 10000); - Channel channel = connection.createChannel(); - channel.queueDeclare(queueName, durable, false, this.autodelete, args); - return channel; - } - - private Channel getOrCreateChannel(final String queueName, boolean durable, boolean autodelete) - throws Exception { - if (channels.containsKey(queueName)) { - return channels.get(queueName); - } - - if (this.connection == null) { - this.connection = createConnection(); - } - channels.put(queueName, createChannel(this.connection, queueName, durable, autodelete)); - return channels.get(queueName); - } - - public void close() throws IOException { - channels - .values() - .forEach( - ch -> { - try { - ch.close(); - } catch (Exception e) { - // TODO LOG - } - }); - - this.connection.close(); - } - - public boolean sendMessage(final Message message, String queueName) throws Exception { - try { - Channel channel = getOrCreateChannel(queueName, this.durable, this.autodelete); - channel.basicPublish("", queueName, null, message.toString().getBytes()); - return true; - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - - public boolean sendMessage( - final Message message, String queueName, boolean durable_var, boolean autodelete_var) - throws Exception { - try { - Channel channel = getOrCreateChannel(queueName, durable_var, autodelete_var); - channel.basicPublish("", queueName, null, message.toString().getBytes()); - return true; - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - - public void startConsumingMessage( - final String queueName, final boolean durable, final boolean autodelete) throws Exception { - - Channel channel = createChannel(createConnection(), queueName, durable, autodelete); - channel.basicConsume(queueName, false, new MessageConsumer(channel, queueMessages)); - } -} diff --git a/dhp-common/src/main/java/eu/dnetlib/message/MessageType.java b/dhp-common/src/main/java/eu/dnetlib/message/MessageType.java deleted file mode 100644 index 72cbda252..000000000 --- a/dhp-common/src/main/java/eu/dnetlib/message/MessageType.java +++ /dev/null @@ -1,6 +0,0 @@ - -package eu.dnetlib.message; - -public enum MessageType { - ONGOING, REPORT -} diff --git a/dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java b/dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java deleted file mode 100644 index 442f7b5c2..000000000 --- a/dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java +++ /dev/null @@ -1,51 +0,0 @@ - -package eu.dnetlib.message; - -import static org.junit.jupiter.api.Assertions.*; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.junit.jupiter.api.Test; - -public class MessageTest { - - @Test - public void fromJsonTest() throws IOException { - Message m = new Message(); - m.setWorkflowId("wId"); - m.setType(MessageType.ONGOING); - m.setJobName("Collection"); - Map body = new HashMap<>(); - body.put("parsedItem", "300"); - body.put("ExecutionTime", "30s"); - - m.setBody(body); - System.out.println("m = " + m); - Message m1 = Message.fromJson(m.toString()); - assertEquals(m1.getWorkflowId(), m.getWorkflowId()); - assertEquals(m1.getType(), m.getType()); - assertEquals(m1.getJobName(), m.getJobName()); - - assertNotNull(m1.getBody()); - m1.getBody().keySet().forEach(it -> assertEquals(m1.getBody().get(it), m.getBody().get(it))); - assertEquals(m1.getJobName(), m.getJobName()); - } - - @Test - public void toStringTest() { - final String expectedJson = "{\"workflowId\":\"wId\",\"jobName\":\"Collection\",\"type\":\"ONGOING\",\"body\":{\"ExecutionTime\":\"30s\",\"parsedItem\":\"300\"}}"; - Message m = new Message(); - m.setWorkflowId("wId"); - m.setType(MessageType.ONGOING); - m.setJobName("Collection"); - Map body = new HashMap<>(); - body.put("parsedItem", "300"); - body.put("ExecutionTime", "30s"); - - m.setBody(body); - - assertEquals(expectedJson, m.toString()); - } -} diff --git a/pom.xml b/pom.xml index 3e0626aed..cfe1edfbd 100644 --- a/pom.xml +++ b/pom.xml @@ -374,11 +374,6 @@ provided - - com.rabbitmq - amqp-client - 5.6.0 - com.jayway.jsonpath json-path