removed rabbitMq
This commit is contained in:
parent
157e0a359f
commit
c934ff4a81
|
@ -53,25 +53,15 @@ public class AjaxController extends AbstractLbsController {
|
||||||
@Autowired
|
@Autowired
|
||||||
private ThreadManager threadManager;
|
private ThreadManager threadManager;
|
||||||
|
|
||||||
@Value("${lbs.rabbit.homepage}")
|
|
||||||
private String rabbitmqUiUrl;
|
|
||||||
|
|
||||||
@Value("${lbs.elastic.homepage}")
|
@Value("${lbs.elastic.homepage}")
|
||||||
private String elasticSearchUiUrl;
|
private String elasticSearchUiUrl;
|
||||||
|
|
||||||
@Value("${lbs.rabbit.enabled}")
|
|
||||||
private boolean rabbitmqEnabled;
|
|
||||||
|
|
||||||
@GetMapping("externalTools")
|
@GetMapping("externalTools")
|
||||||
public List<Tool> listExternalUrls() {
|
public List<Tool> listExternalUrls() {
|
||||||
final List<Tool> tools = new ArrayList<>();
|
final List<Tool> tools = new ArrayList<>();
|
||||||
|
|
||||||
tools.add(new Tool("ElasticSearch", elasticSearchUiUrl));
|
tools.add(new Tool("ElasticSearch", elasticSearchUiUrl));
|
||||||
|
|
||||||
if (rabbitmqEnabled) {
|
|
||||||
tools.add(new Tool("RabbitMQ", rabbitmqUiUrl));
|
|
||||||
}
|
|
||||||
|
|
||||||
return tools;
|
return tools;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,6 @@ import org.springframework.stereotype.Component;
|
||||||
import eu.dnetlib.lbs.properties.DatabaseProperties;
|
import eu.dnetlib.lbs.properties.DatabaseProperties;
|
||||||
import eu.dnetlib.lbs.properties.ElasticSearchProperties;
|
import eu.dnetlib.lbs.properties.ElasticSearchProperties;
|
||||||
import eu.dnetlib.lbs.properties.EmailProperties;
|
import eu.dnetlib.lbs.properties.EmailProperties;
|
||||||
import eu.dnetlib.lbs.properties.RabbitMQProperties;
|
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
public class CurrentStatus {
|
public class CurrentStatus {
|
||||||
|
@ -20,8 +19,7 @@ public class CurrentStatus {
|
||||||
private DatabaseProperties h2;
|
private DatabaseProperties h2;
|
||||||
@Autowired
|
@Autowired
|
||||||
private ElasticSearchProperties elasticSearch;
|
private ElasticSearchProperties elasticSearch;
|
||||||
@Autowired
|
|
||||||
private RabbitMQProperties rabbitMQ;
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private EmailProperties emails;
|
private EmailProperties emails;
|
||||||
|
|
||||||
|
@ -74,14 +72,6 @@ public class CurrentStatus {
|
||||||
this.elasticSearch = elasticSearch;
|
this.elasticSearch = elasticSearch;
|
||||||
}
|
}
|
||||||
|
|
||||||
public RabbitMQProperties getRabbitMQ() {
|
|
||||||
return this.rabbitMQ;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setRabbitMQ(final RabbitMQProperties rabbitMQ) {
|
|
||||||
this.rabbitMQ = rabbitMQ;
|
|
||||||
}
|
|
||||||
|
|
||||||
public DatabaseProperties getH2() {
|
public DatabaseProperties getH2() {
|
||||||
return this.h2;
|
return this.h2;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,104 +0,0 @@
|
||||||
package eu.dnetlib.lbs.events.input;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
|
|
||||||
import com.rabbitmq.client.AMQP;
|
|
||||||
import com.rabbitmq.client.Channel;
|
|
||||||
import com.rabbitmq.client.Connection;
|
|
||||||
import com.rabbitmq.client.ConnectionFactory;
|
|
||||||
import com.rabbitmq.client.DefaultConsumer;
|
|
||||||
import com.rabbitmq.client.Envelope;
|
|
||||||
|
|
||||||
import eu.dnetlib.lbs.elasticsearch.Event;
|
|
||||||
import eu.dnetlib.lbs.utils.LbsQueue;
|
|
||||||
|
|
||||||
public class RabbitMQConsumer implements Runnable {
|
|
||||||
|
|
||||||
private final String queue;
|
|
||||||
private final String host;
|
|
||||||
private final int port;
|
|
||||||
private final String username;
|
|
||||||
private final String password;
|
|
||||||
private final LbsQueue<String, Event> localQueue;
|
|
||||||
|
|
||||||
private static final Log log = LogFactory.getLog(RabbitMQConsumer.class);
|
|
||||||
|
|
||||||
public RabbitMQConsumer(final String queue, final String host, final int port, final String username, final String password,
|
|
||||||
final LbsQueue<String, Event> localQueue) {
|
|
||||||
super();
|
|
||||||
this.queue = queue;
|
|
||||||
this.host = host;
|
|
||||||
this.port = port;
|
|
||||||
this.username = username;
|
|
||||||
this.password = password;
|
|
||||||
this.localQueue = localQueue;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
final ConnectionFactory factory = new ConnectionFactory();
|
|
||||||
factory.setHost(this.host);
|
|
||||||
factory.setPort(this.port);
|
|
||||||
factory.setUsername(this.username);
|
|
||||||
factory.setPassword(this.password);
|
|
||||||
factory.setAutomaticRecoveryEnabled(true);
|
|
||||||
|
|
||||||
log.info("Starting rabbitMQ consumer: " + Thread.currentThread().getName());
|
|
||||||
|
|
||||||
try {
|
|
||||||
final Connection connection = factory.newConnection();
|
|
||||||
final Channel channel = connection.createChannel();
|
|
||||||
final DefaultConsumer consumer = new DefaultConsumer(channel) {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void handleDelivery(final String consumerTag,
|
|
||||||
final Envelope envelope,
|
|
||||||
final AMQP.BasicProperties properties,
|
|
||||||
final byte[] body)
|
|
||||||
throws IOException {
|
|
||||||
|
|
||||||
try {
|
|
||||||
getLocalQueue().offer(new String(body, "UTF-8"));
|
|
||||||
} catch (final Throwable e) {
|
|
||||||
log.error("Error processing event", e);
|
|
||||||
} finally {
|
|
||||||
getChannel().basicAck(envelope.getDeliveryTag(), false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
channel.queueDeclare(this.queue, true, false, false, null);
|
|
||||||
channel.basicConsume(this.queue, false, consumer);
|
|
||||||
} catch (final Exception e) {
|
|
||||||
log.error("Error creating consumer", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getQueue() {
|
|
||||||
return this.queue;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getHost() {
|
|
||||||
return this.host;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getPort() {
|
|
||||||
return this.port;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getUsername() {
|
|
||||||
return this.username;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getPassword() {
|
|
||||||
return this.password;
|
|
||||||
}
|
|
||||||
|
|
||||||
public LbsQueue<String, Event> getLocalQueue() {
|
|
||||||
return this.localQueue;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,27 +0,0 @@
|
||||||
package eu.dnetlib.lbs.events.input;
|
|
||||||
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import eu.dnetlib.lbs.elasticsearch.Event;
|
|
||||||
import eu.dnetlib.lbs.properties.RabbitMQProperties;
|
|
||||||
import eu.dnetlib.lbs.utils.LbsQueue;
|
|
||||||
|
|
||||||
@Component
|
|
||||||
@ConditionalOnProperty(prefix = "lbs.rabbit", name = "enabled")
|
|
||||||
public class RabbitMQConsumerFactory {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private RabbitMQProperties props;
|
|
||||||
|
|
||||||
public RabbitMQConsumer newConsumer(final LbsQueue<String, Event> localQueue) {
|
|
||||||
final String queue = this.props.getQueue();
|
|
||||||
final String host = this.props.getHost();
|
|
||||||
final int port = this.props.getPort();
|
|
||||||
final String username = this.props.getUsername();
|
|
||||||
final String password = this.props.getPassword();
|
|
||||||
return new RabbitMQConsumer(queue, host, port, username, password, localQueue);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,56 +0,0 @@
|
||||||
package eu.dnetlib.lbs.events.input;
|
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import eu.dnetlib.lbs.elasticsearch.Event;
|
|
||||||
import eu.dnetlib.lbs.events.manager.EventManager;
|
|
||||||
import eu.dnetlib.lbs.events.manager.EventManagerFactory;
|
|
||||||
import eu.dnetlib.lbs.properties.RabbitMQProperties;
|
|
||||||
import eu.dnetlib.lbs.topics.TopicTypeRepository;
|
|
||||||
import eu.dnetlib.lbs.utils.EventVerifier;
|
|
||||||
import eu.dnetlib.lbs.utils.JsonMessageToEventFunction;
|
|
||||||
import eu.dnetlib.lbs.utils.LbsQueue;
|
|
||||||
import eu.dnetlib.lbs.utils.QueueManager;
|
|
||||||
import eu.dnetlib.lbs.utils.ThreadManager;
|
|
||||||
|
|
||||||
@Component
|
|
||||||
@ConditionalOnProperty(prefix = "lbs.rabbit", name = "enabled")
|
|
||||||
public class RabbitMQConsumerLauncher {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private EventManagerFactory eventManagerFactory;
|
|
||||||
@Autowired
|
|
||||||
private RabbitMQConsumerFactory rabbitMQConsumerFactory;
|
|
||||||
@Autowired
|
|
||||||
private QueueManager queueManager;
|
|
||||||
@Autowired
|
|
||||||
private TopicTypeRepository topicTypeRepo;
|
|
||||||
@Autowired
|
|
||||||
private RabbitMQProperties props;
|
|
||||||
@Autowired
|
|
||||||
private ThreadManager threadManager;
|
|
||||||
|
|
||||||
private static final Log log = LogFactory.getLog(RabbitMQConsumerLauncher.class);
|
|
||||||
|
|
||||||
@PostConstruct
|
|
||||||
public void init() {
|
|
||||||
for (int i = 0; i < this.props.getNumberOfConsumers(); i++) {
|
|
||||||
final LbsQueue<String, Event> queue =
|
|
||||||
this.queueManager.newQueue("event-manager-" + i, new JsonMessageToEventFunction(), new EventVerifier(this.topicTypeRepo));
|
|
||||||
final EventManager manager = this.eventManagerFactory.newEventManager(queue);
|
|
||||||
final RabbitMQConsumer consumer = this.rabbitMQConsumerFactory.newConsumer(queue);
|
|
||||||
|
|
||||||
log.info("Launching indexer and consumer threads: " + i);
|
|
||||||
|
|
||||||
this.threadManager.newThread("rabbit-manager-" + i, manager);
|
|
||||||
this.threadManager.newThread("rabbit-consumer-" + i, consumer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,93 +0,0 @@
|
||||||
package eu.dnetlib.lbs.properties;
|
|
||||||
|
|
||||||
import javax.validation.constraints.NotNull;
|
|
||||||
|
|
||||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
@Component
|
|
||||||
@ConfigurationProperties(prefix = "lbs.rabbit")
|
|
||||||
public class RabbitMQProperties {
|
|
||||||
|
|
||||||
@NotNull
|
|
||||||
private boolean enabled;
|
|
||||||
@NotNull
|
|
||||||
private String host;
|
|
||||||
@NotNull
|
|
||||||
private int port;
|
|
||||||
@NotNull
|
|
||||||
private String username;
|
|
||||||
@NotNull
|
|
||||||
private String password;
|
|
||||||
@NotNull
|
|
||||||
private String queue;
|
|
||||||
@NotNull
|
|
||||||
private int numberOfConsumers;
|
|
||||||
@NotNull
|
|
||||||
private String homepage;
|
|
||||||
|
|
||||||
public boolean isEnabled() {
|
|
||||||
return this.enabled;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setEnabled(final boolean enabled) {
|
|
||||||
this.enabled = enabled;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getHost() {
|
|
||||||
return this.host;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setHost(final String host) {
|
|
||||||
this.host = host;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getPort() {
|
|
||||||
return this.port;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setPort(final int port) {
|
|
||||||
this.port = port;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getUsername() {
|
|
||||||
return this.username;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setUsername(final String username) {
|
|
||||||
this.username = username;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getPassword() {
|
|
||||||
return this.password;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setPassword(final String password) {
|
|
||||||
this.password = password;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getQueue() {
|
|
||||||
return this.queue;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setQueue(final String queue) {
|
|
||||||
this.queue = queue;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getNumberOfConsumers() {
|
|
||||||
return this.numberOfConsumers;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setNumberOfConsumers(final int numberOfConsumers) {
|
|
||||||
this.numberOfConsumers = numberOfConsumers;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getHomepage() {
|
|
||||||
return this.homepage;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setHomepage(final String homepage) {
|
|
||||||
this.homepage = homepage;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -11,29 +11,13 @@ spring.jpa.database-platform = org.hibernate.dialect.H2Dialect
|
||||||
lbs.database.driverName = ${spring.datasource.driver-class-name}
|
lbs.database.driverName = ${spring.datasource.driver-class-name}
|
||||||
lbs.database.url = ${spring.datasource.url}
|
lbs.database.url = ${spring.datasource.url}
|
||||||
|
|
||||||
lbs.rabbit.enabled = false
|
|
||||||
|
|
||||||
# for development server
|
# for development server
|
||||||
#lbs.rabbit.host = broker1-dev-dnet.d4science.org
|
|
||||||
#lbs.rabbit.port = 5672
|
|
||||||
#lbs.rabbit.username = r_admin
|
|
||||||
#lbs.rabbit.password = 9g8fed7gpohef9y84th98h
|
|
||||||
#lbs.rabbit.queue = test_queue
|
|
||||||
#lbs.rabbit.numberOfConsumers = 4
|
|
||||||
#lbs.rabbit.homepage = http://${lbs.rabbit.host}:15672
|
|
||||||
#lbs.elastic.clusterNodes = broker1-dev-dnet.d4science.org:9300
|
#lbs.elastic.clusterNodes = broker1-dev-dnet.d4science.org:9300
|
||||||
#lbs.elastic.clusterName = D-Net Dev Broker
|
#lbs.elastic.clusterName = D-Net Dev Broker
|
||||||
#lbs.elastic.homepage = http://broker1-dev-dnet.d4science.org:9200/_plugin/hq
|
#lbs.elastic.homepage = http://broker1-dev-dnet.d4science.org:9200/_plugin/hq
|
||||||
|
|
||||||
|
|
||||||
# for localhost
|
# for localhost
|
||||||
lbs.rabbit.host = 127.0.0.1
|
|
||||||
lbs.rabbit.port = 5672
|
|
||||||
lbs.rabbit.username = admin
|
|
||||||
lbs.rabbit.password = driver
|
|
||||||
lbs.rabbit.queue = test_queue
|
|
||||||
lbs.rabbit.numberOfConsumers = 2
|
|
||||||
lbs.rabbit.homepage = http://${lbs.rabbit.host}:15672
|
|
||||||
lbs.elastic.clusterNodes = 127.0.0.1:9200
|
lbs.elastic.clusterNodes = 127.0.0.1:9200
|
||||||
lbs.elastic.clusterName = es_broker_dev
|
lbs.elastic.clusterName = es_broker_dev
|
||||||
lbs.elastic.homepage = http://127.0.0.1:9200/_plugin/hq
|
lbs.elastic.homepage = http://127.0.0.1:9200/_plugin/hq
|
||||||
|
|
|
@ -148,32 +148,9 @@
|
||||||
</table>
|
</table>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<div class="col-xs-12 col-md-6">
|
|
||||||
<div class="panel panel-default">
|
|
||||||
<div class="panel-heading">RabbitMQ</div>
|
|
||||||
<table class="table" ng-show="summary.rabbitMQ.enabled">
|
|
||||||
<tr>
|
|
||||||
<th class="col-xs-4">host</th>
|
|
||||||
<td class="col-xs-8">{{summary.rabbitMQ.host}}</td>
|
|
||||||
</tr>
|
|
||||||
<tr>
|
|
||||||
<th class="col-xs-4">port</th>
|
|
||||||
<td class="col-xs-8">{{summary.rabbitMQ.port}}</td>
|
|
||||||
</tr>
|
|
||||||
<tr>
|
|
||||||
<th class="col-xs-4">queue</th>
|
|
||||||
<td class="col-xs-8">{{summary.rabbitMQ.queue}}</td>
|
|
||||||
</tr>
|
|
||||||
</table>
|
|
||||||
<div class="panel-body" ng-hide="summary.rabbitMQ.enabled"><span class="text-muted">Support of RabbitMQ is disabled</span></div>
|
|
||||||
</div>
|
|
||||||
</div>
|
|
||||||
|
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<div class="row">
|
<div class="row">
|
||||||
|
|
||||||
<div class="col-xs-12 col-md-6">
|
<div class="col-xs-12 col-md-6">
|
||||||
<div class="panel panel-default">
|
<div class="panel panel-default">
|
||||||
<div class="panel-heading">H2 database</div>
|
<div class="panel-heading">H2 database</div>
|
||||||
|
|
Loading…
Reference in New Issue