From 9294851a6c713c2ec68494a71780a704f7c9ef6a Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 5 Apr 2019 12:19:25 +0200 Subject: [PATCH] implemented comunication layer using rabbitMq between oozie node and Dnet --- dhp-applications/dhp-collector-worker/pom.xml | 7 ++ .../DnetCollectorWorkerApplication.java | 4 - dhp-common/pom.xml | 15 ++++ .../main/java/eu/dnetlib/message/Message.java | 82 ++++++++++++++++++ .../eu/dnetlib/message/MessageConsumer.java | 44 ++++++++++ .../eu/dnetlib/message/MessageManager.java | 82 ++++++++++++++++++ .../java/eu/dnetlib/message/MessageType.java | 8 ++ .../java/eu/dnetlib/message/MessageTest.java | 83 +++++++++++++++++++ 8 files changed, 321 insertions(+), 4 deletions(-) create mode 100644 dhp-common/src/main/java/eu/dnetlib/message/Message.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java create mode 100644 dhp-common/src/main/java/eu/dnetlib/message/MessageType.java create mode 100644 dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java diff --git a/dhp-applications/dhp-collector-worker/pom.xml b/dhp-applications/dhp-collector-worker/pom.xml index af82916517..8dec778e68 100644 --- a/dhp-applications/dhp-collector-worker/pom.xml +++ b/dhp-applications/dhp-collector-worker/pom.xml @@ -67,6 +67,13 @@ jaxen jaxen + + + com.rabbitmq + amqp-client + 5.6.0 + + org.springframework.boot spring-boot-starter-test diff --git a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java index c2854b8f28..d654a30e8e 100644 --- a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java +++ b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java @@ -57,12 +57,8 @@ public class DnetCollectorWorkerApplication implements CommandLineRunner { public void run(final String... args) throws Exception { if (args.length == 0) { return; } if (args.length != 3) { throw new DnetCollectorException("Invalid number of parameters, expected: hdfs_path and json_api_description"); } - //TODO : migrate to https://commons.apache.org/proper/commons-cli/usage.html - - - final String hdfsPath = args[0]; log.info("hdfsPath ="+hdfsPath); diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml index 2de3dc310f..cac0f08c2d 100644 --- a/dhp-common/pom.xml +++ b/dhp-common/pom.xml @@ -29,6 +29,21 @@ 2.2 provided + + + com.fasterxml.jackson.core + jackson-databind + 2.6.5 + provided + + + + + + com.rabbitmq + amqp-client + 5.6.0 + diff --git a/dhp-common/src/main/java/eu/dnetlib/message/Message.java b/dhp-common/src/main/java/eu/dnetlib/message/Message.java new file mode 100644 index 0000000000..3767bd0260 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/message/Message.java @@ -0,0 +1,82 @@ +package eu.dnetlib.message; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.Map; + +public class Message { + + private String workflowId; + + private String jobName; + + private MessageType type; + + private Map body; + + + public static Message fromJson(final String json) throws IOException { + final ObjectMapper jsonMapper = new ObjectMapper(); + return jsonMapper.readValue(json, Message.class); + + + } + + + public Message() { + + + + } + + public Message(String workflowId, String jobName, MessageType type, Map body) { + this.workflowId = workflowId; + this.jobName = jobName; + this.type = type; + this.body = body; + } + + public String getWorkflowId() { + return workflowId; + } + + public void setWorkflowId(String workflowId) { + this.workflowId = workflowId; + } + + public String getJobName() { + return jobName; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public MessageType getType() { + return type; + } + + public void setType(MessageType type) { + this.type = type; + } + + public Map getBody() { + return body; + } + + public void setBody(Map body) { + this.body = body; + } + + @Override + public String toString() { + final ObjectMapper jsonMapper = new ObjectMapper(); + try { + return jsonMapper.writeValueAsString(this); + } catch (JsonProcessingException e) { + return null; + } + } +} diff --git a/dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java b/dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java new file mode 100644 index 0000000000..ee58f11e86 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/message/MessageConsumer.java @@ -0,0 +1,44 @@ +package eu.dnetlib.message; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.LinkedBlockingQueue; + +public class MessageConsumer extends DefaultConsumer { + + final LinkedBlockingQueue queueMessages; + + + /** + * Constructs a new instance and records its association to the passed-in channel. + * + * @param channel the channel to which this consumer is attached + * @param queueMessages + */ + public MessageConsumer(Channel channel, LinkedBlockingQueue queueMessages) { + super(channel); + this.queueMessages = queueMessages; + } + + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + final String json = new String(body, StandardCharsets.UTF_8); + Message message = Message.fromJson(json); + try { + this.queueMessages.put(message); + System.out.println("Receiving Message "+message); + } catch (InterruptedException e) { + if (message.getType()== MessageType.REPORT) + throw new RuntimeException("Error on sending message"); + else { + //TODO LOGGING EXCEPTION + } + } + } +} diff --git a/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java b/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java new file mode 100644 index 0000000000..8db9f3ce15 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java @@ -0,0 +1,82 @@ +package eu.dnetlib.message; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import sun.rmi.runtime.Log; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; + +public class MessageManager { + + private final String messageHost; + + private final String username; + + private final String password; + + + private boolean durable; + + private boolean autodelete; + + final private LinkedBlockingQueue queueMessages; + + public MessageManager(String messageHost, String username, String password, final LinkedBlockingQueue queueMessages) { + this.queueMessages = queueMessages; + this.messageHost = messageHost; + this.username = username; + this.password = password; + } + + + public MessageManager(String messageHost, String username, String password, boolean durable, boolean autodelete, final LinkedBlockingQueue queueMessages) { + this.queueMessages = queueMessages; + this.messageHost = messageHost; + this.username = username; + this.password = password; + + this.durable = durable; + this.autodelete = autodelete; + } + private Channel createChannel(final String queueName, final boolean durable, final boolean autodelete ) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(this.messageHost); + factory.setUsername(this.username); + factory.setPassword(this.password); + Connection connection = factory.newConnection(); + Map args = new HashMap(); + args.put("x-message-ttl", 10000); + Channel channel = connection.createChannel(); + channel.queueDeclare(queueName, durable, false, this.autodelete, args); + return channel; + } + public boolean sendMessage(final Message message, String queueName) throws Exception { + try (Channel channel = createChannel(queueName, this.durable, this.autodelete)) { + + channel.basicPublish("", queueName,null, message.toString().getBytes()); + return true; + } catch (Throwable e) { + e.printStackTrace(); + return false; + } + } + + public boolean sendMessage(final Message message, String queueName, boolean durable_var, boolean autodelete_var) throws Exception { + try (Channel channel = createChannel(queueName, durable_var, autodelete_var)) { + + channel.basicPublish("", queueName,null, message.toString().getBytes()); + return true; + } catch (Throwable e) { + e.printStackTrace(); + return false; + } + } + + public void startConsumingMessage(final String queueName, final boolean durable, final boolean autodelete) throws Exception{ + Channel channel = createChannel(queueName, durable, autodelete); + channel.basicConsume(queueName, false, new MessageConsumer(channel,queueMessages)); + } +} diff --git a/dhp-common/src/main/java/eu/dnetlib/message/MessageType.java b/dhp-common/src/main/java/eu/dnetlib/message/MessageType.java new file mode 100644 index 0000000000..c2440c3fe7 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/message/MessageType.java @@ -0,0 +1,8 @@ +package eu.dnetlib.message; + +public enum MessageType { + + ONGOING, + REPORT + +} diff --git a/dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java b/dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java new file mode 100644 index 0000000000..baed36304d --- /dev/null +++ b/dhp-common/src/test/java/eu/dnetlib/message/MessageTest.java @@ -0,0 +1,83 @@ +package eu.dnetlib.message; + +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.*; + +public class MessageTest { + + @Test + public void fromJsonTest() throws IOException { + Message m = new Message(); + m.setWorkflowId("wId"); + m.setType(MessageType.ONGOING); + m.setJobName("Collection"); + Map body= new HashMap<>(); + body.put("parsedItem", "300"); + body.put("ExecutionTime", "30s"); + + m.setBody(body); + System.out.println("m = " + m); + Message m1 = Message.fromJson(m.toString()); + assertEquals(m1.getWorkflowId(), m.getWorkflowId()); + assertEquals(m1.getType(), m.getType()); + assertEquals(m1.getJobName(), m.getJobName()); + + assertNotNull(m1.getBody()); + m1.getBody().keySet().forEach(it -> assertEquals(m1.getBody().get(it), m.getBody().get(it))); + assertEquals(m1.getJobName(), m.getJobName()); + } + + @Test + public void toStringTest() { + final String expectedJson= "{\"workflowId\":\"wId\",\"jobName\":\"Collection\",\"type\":\"ONGOING\",\"body\":{\"ExecutionTime\":\"30s\",\"parsedItem\":\"300\"}}"; + Message m = new Message(); + m.setWorkflowId("wId"); + m.setType(MessageType.ONGOING); + m.setJobName("Collection"); + Map body= new HashMap<>(); + body.put("parsedItem", "300"); + body.put("ExecutionTime", "30s"); + + m.setBody(body); + + assertEquals(expectedJson,m.toString()); + + + } + + + @Test + public void sendMessageTest() throws Exception { + + final String expectedJson= "{\"workflowId\":\"wId\",\"jobName\":\"Collection\",\"type\":\"ONGOING\",\"body\":{\"ExecutionTime\":\"30s\",\"parsedItem\":\"300\"}}"; + Message m = new Message(); + m.setWorkflowId("wf_20190405_105048_275"); + m.setType(MessageType.ONGOING); + m.setJobName("Collection"); + Map body= new HashMap<>(); + body.put("progressCount", "100"); + body.put("ExecutionTime", "30s"); + + m.setBody(body); + + MessageManager mm = new MessageManager("broker1-dev-dnet.d4science.org","r_admin", "9g8fed7gpohef9y84th98h", false,false, null); + + + + + + mm.sendMessage(m, "dev_ongoing"); + + m.setType(MessageType.REPORT); + + body.put("mdStoreSize", "368"); + + + mm.sendMessage(m, "dev_report", true, false); + } +} \ No newline at end of file