Fork 0

implemented comunication layer using rabbitMq between oozie node and Dnet

This commit is contained in:
Sandro La Bruzzo 2019-04-05 12:19:25 +02:00
parent 3f4ba71bbd
commit 9294851a6c
8 changed files with 321 additions and 4 deletions

View File

@ -67,6 +67,13 @@
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->

View File

@ -57,12 +57,8 @@ public class DnetCollectorWorkerApplication implements CommandLineRunner {
public void run(final String... args) throws Exception {
if (args.length == 0) { return; }
if (args.length != 3) { throw new DnetCollectorException("Invalid number of parameters, expected: hdfs_path and json_api_description"); }
//TODO : migrate to https://commons.apache.org/proper/commons-cli/usage.html
final String hdfsPath = args[0];
log.info("hdfsPath ="+hdfsPath);

View File

@ -29,6 +29,21 @@
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->

View File

@ -0,0 +1,82 @@
package eu.dnetlib.message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Map;
public class Message {
private String workflowId;
private String jobName;
private MessageType type;
private Map<String, String> body;
public static Message fromJson(final String json) throws IOException {
final ObjectMapper jsonMapper = new ObjectMapper();
return jsonMapper.readValue(json, Message.class);
public Message() {
public Message(String workflowId, String jobName, MessageType type, Map<String, String> body) {
this.workflowId = workflowId;
this.jobName = jobName;
this.type = type;
this.body = body;
public String getWorkflowId() {
return workflowId;
public void setWorkflowId(String workflowId) {
this.workflowId = workflowId;
public String getJobName() {
return jobName;
public void setJobName(String jobName) {
this.jobName = jobName;
public MessageType getType() {
return type;
public void setType(MessageType type) {
this.type = type;
public Map<String, String> getBody() {
return body;
public void setBody(Map<String, String> body) {
this.body = body;
public String toString() {
final ObjectMapper jsonMapper = new ObjectMapper();
try {
return jsonMapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
return null;

View File

@ -0,0 +1,44 @@
package eu.dnetlib.message;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.LinkedBlockingQueue;
public class MessageConsumer extends DefaultConsumer {
final LinkedBlockingQueue<Message> queueMessages;
* Constructs a new instance and records its association to the passed-in channel.
* @param channel the channel to which this consumer is attached
* @param queueMessages
public MessageConsumer(Channel channel, LinkedBlockingQueue<Message> queueMessages) {
this.queueMessages = queueMessages;
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
final String json = new String(body, StandardCharsets.UTF_8);
Message message = Message.fromJson(json);
try {
System.out.println("Receiving Message "+message);
} catch (InterruptedException e) {
if (message.getType()== MessageType.REPORT)
throw new RuntimeException("Error on sending message");
else {

View File

@ -0,0 +1,82 @@
package eu.dnetlib.message;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import sun.rmi.runtime.Log;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
public class MessageManager {
private final String messageHost;
private final String username;
private final String password;
private boolean durable;
private boolean autodelete;
final private LinkedBlockingQueue<Message> queueMessages;
public MessageManager(String messageHost, String username, String password, final LinkedBlockingQueue<Message> queueMessages) {
this.queueMessages = queueMessages;
this.messageHost = messageHost;
this.username = username;
this.password = password;
public MessageManager(String messageHost, String username, String password, boolean durable, boolean autodelete, final LinkedBlockingQueue<Message> queueMessages) {
this.queueMessages = queueMessages;
this.messageHost = messageHost;
this.username = username;
this.password = password;
this.durable = durable;
this.autodelete = autodelete;
private Channel createChannel(final String queueName, final boolean durable, final boolean autodelete ) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 10000);
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, durable, false, this.autodelete, args);
return channel;
public boolean sendMessage(final Message message, String queueName) throws Exception {
try (Channel channel = createChannel(queueName, this.durable, this.autodelete)) {
channel.basicPublish("", queueName,null, message.toString().getBytes());
return true;
} catch (Throwable e) {
return false;
public boolean sendMessage(final Message message, String queueName, boolean durable_var, boolean autodelete_var) throws Exception {
try (Channel channel = createChannel(queueName, durable_var, autodelete_var)) {
channel.basicPublish("", queueName,null, message.toString().getBytes());
return true;
} catch (Throwable e) {
return false;
public void startConsumingMessage(final String queueName, final boolean durable, final boolean autodelete) throws Exception{
Channel channel = createChannel(queueName, durable, autodelete);
channel.basicConsume(queueName, false, new MessageConsumer(channel,queueMessages));

View File

@ -0,0 +1,8 @@
package eu.dnetlib.message;
public enum MessageType {

View File

@ -0,0 +1,83 @@
package eu.dnetlib.message;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.*;
public class MessageTest {
public void fromJsonTest() throws IOException {
Message m = new Message();
Map<String,String> body= new HashMap<>();
body.put("parsedItem", "300");
body.put("ExecutionTime", "30s");
System.out.println("m = " + m);
Message m1 = Message.fromJson(m.toString());
assertEquals(m1.getWorkflowId(), m.getWorkflowId());
assertEquals(m1.getType(), m.getType());
assertEquals(m1.getJobName(), m.getJobName());
m1.getBody().keySet().forEach(it -> assertEquals(m1.getBody().get(it), m.getBody().get(it)));
assertEquals(m1.getJobName(), m.getJobName());
public void toStringTest() {
final String expectedJson= "{\"workflowId\":\"wId\",\"jobName\":\"Collection\",\"type\":\"ONGOING\",\"body\":{\"ExecutionTime\":\"30s\",\"parsedItem\":\"300\"}}";
Message m = new Message();
Map<String,String> body= new HashMap<>();
body.put("parsedItem", "300");
body.put("ExecutionTime", "30s");
public void sendMessageTest() throws Exception {
final String expectedJson= "{\"workflowId\":\"wId\",\"jobName\":\"Collection\",\"type\":\"ONGOING\",\"body\":{\"ExecutionTime\":\"30s\",\"parsedItem\":\"300\"}}";
Message m = new Message();
Map<String,String> body= new HashMap<>();
body.put("progressCount", "100");
body.put("ExecutionTime", "30s");
MessageManager mm = new MessageManager("broker1-dev-dnet.d4science.org","r_admin", "9g8fed7gpohef9y84th98h", false,false, null);
mm.sendMessage(m, "dev_ongoing");
body.put("mdStoreSize", "368");
mm.sendMessage(m, "dev_report", true, false);