dnet-applications/dhp-broker-application/.svn/pristine/11/11a68f3ba52270300c16b36b3df...

105 lines
2.6 KiB
Plaintext

package eu.dnetlib.lbs.events.input;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import eu.dnetlib.lbs.elasticsearch.Event;
import eu.dnetlib.lbs.utils.LbsQueue;
public class RabbitMQConsumer implements Runnable {
private final String queue;
private final String host;
private final int port;
private final String username;
private final String password;
private final LbsQueue<String, Event> localQueue;
private static final Log log = LogFactory.getLog(RabbitMQConsumer.class);
public RabbitMQConsumer(final String queue, final String host, final int port, final String username, final String password,
final LbsQueue<String, Event> localQueue) {
super();
this.queue = queue;
this.host = host;
this.port = port;
this.username = username;
this.password = password;
this.localQueue = localQueue;
}
@Override
public void run() {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.host);
factory.setPort(this.port);
factory.setUsername(this.username);
factory.setPassword(this.password);
factory.setAutomaticRecoveryEnabled(true);
log.info("Starting rabbitMQ consumer: " + Thread.currentThread().getName());
try {
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
final DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(final String consumerTag,
final Envelope envelope,
final AMQP.BasicProperties properties,
final byte[] body)
throws IOException {
try {
getLocalQueue().offer(new String(body, "UTF-8"));
} catch (final Throwable e) {
log.error("Error processing event", e);
} finally {
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.queueDeclare(this.queue, true, false, false, null);
channel.basicConsume(this.queue, false, consumer);
} catch (final Exception e) {
log.error("Error creating consumer", e);
}
}
public String getQueue() {
return this.queue;
}
public String getHost() {
return this.host;
}
public int getPort() {
return this.port;
}
public String getUsername() {
return this.username;
}
public String getPassword() {
return this.password;
}
public LbsQueue<String, Event> getLocalQueue() {
return this.localQueue;
}
}