2020-04-27 14:52:31 +02:00
|
|
|
|
2019-04-05 12:19:25 +02:00
|
|
|
package eu.dnetlib.message;
|
|
|
|
|
2020-04-28 11:23:29 +02:00
|
|
|
import java.io.IOException;
|
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
2019-04-05 12:19:25 +02:00
|
|
|
import com.rabbitmq.client.AMQP;
|
|
|
|
import com.rabbitmq.client.Channel;
|
|
|
|
import com.rabbitmq.client.DefaultConsumer;
|
|
|
|
import com.rabbitmq.client.Envelope;
|
|
|
|
|
|
|
|
public class MessageConsumer extends DefaultConsumer {
|
|
|
|
|
2020-04-27 14:52:31 +02:00
|
|
|
final LinkedBlockingQueue<Message> queueMessages;
|
2019-04-05 12:19:25 +02:00
|
|
|
|
2020-04-27 14:52:31 +02:00
|
|
|
/**
|
|
|
|
* Constructs a new instance and records its association to the passed-in channel.
|
|
|
|
*
|
|
|
|
* @param channel the channel to which this consumer is attached
|
|
|
|
* @param queueMessages
|
|
|
|
*/
|
|
|
|
public MessageConsumer(Channel channel, LinkedBlockingQueue<Message> queueMessages) {
|
|
|
|
super(channel);
|
|
|
|
this.queueMessages = queueMessages;
|
|
|
|
}
|
2019-04-05 12:19:25 +02:00
|
|
|
|
2020-04-27 14:52:31 +02:00
|
|
|
@Override
|
|
|
|
public void handleDelivery(
|
|
|
|
String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
|
|
|
|
throws IOException {
|
|
|
|
final String json = new String(body, StandardCharsets.UTF_8);
|
|
|
|
Message message = Message.fromJson(json);
|
|
|
|
try {
|
|
|
|
this.queueMessages.put(message);
|
|
|
|
System.out.println("Receiving Message " + message);
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
if (message.getType() == MessageType.REPORT)
|
|
|
|
throw new RuntimeException("Error on sending message");
|
|
|
|
else {
|
|
|
|
// TODO LOGGING EXCEPTION
|
|
|
|
}
|
|
|
|
} finally {
|
|
|
|
getChannel().basicAck(envelope.getDeliveryTag(), false);
|
|
|
|
}
|
|
|
|
}
|
2019-04-05 12:19:25 +02:00
|
|
|
}
|