forked from antonis.lempesis/dnet-hadoop
messages sender
This commit is contained in:
parent
1b9731632b
commit
26d2eb946f
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
package eu.dnetlib.message;
|
package eu.dnetlib.dhp.message;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -15,18 +15,15 @@ public class Message implements Serializable {
|
||||||
|
|
||||||
private String jobName;
|
private String jobName;
|
||||||
|
|
||||||
private MessageType type;
|
|
||||||
|
|
||||||
private Map<String, String> body;
|
private Map<String, String> body;
|
||||||
|
|
||||||
public Message() {
|
public Message() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Message(final String workflowId, final String jobName, final MessageType type,
|
public Message(final String workflowId, final String jobName,
|
||||||
final Map<String, String> body) {
|
final Map<String, String> body) {
|
||||||
this.workflowId = workflowId;
|
this.workflowId = workflowId;
|
||||||
this.jobName = jobName;
|
this.jobName = jobName;
|
||||||
this.type = type;
|
|
||||||
this.body = body;
|
this.body = body;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,14 +43,6 @@ public class Message implements Serializable {
|
||||||
this.jobName = jobName;
|
this.jobName = jobName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public MessageType getType() {
|
|
||||||
return type;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setType(final MessageType type) {
|
|
||||||
this.type = type;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Map<String, String> getBody() {
|
public Map<String, String> getBody() {
|
||||||
return body;
|
return body;
|
||||||
}
|
}
|
||||||
|
@ -64,6 +53,6 @@ public class Message implements Serializable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return String.format("Message [workflowId=%s, jobName=%s, type=%s, body=%s]", workflowId, jobName, type, body);
|
return String.format("Message [workflowId=%s, jobName=%s, body=%s]", workflowId, jobName, body);
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
|
|
||||||
package eu.dnetlib.message;
|
package eu.dnetlib.dhp.message;
|
||||||
|
|
||||||
|
import org.apache.http.client.config.RequestConfig;
|
||||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||||
import org.apache.http.client.methods.HttpPut;
|
import org.apache.http.client.methods.HttpPut;
|
||||||
import org.apache.http.entity.SerializableEntity;
|
import org.apache.http.entity.SerializableEntity;
|
||||||
|
@ -13,6 +14,12 @@ public class MessageSender {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(MessageSender.class);
|
private static final Logger log = LoggerFactory.getLogger(MessageSender.class);
|
||||||
|
|
||||||
|
private static final int SOCKET_TIMEOUT_MS = 2000;
|
||||||
|
|
||||||
|
private static final int CONNECTION_REQUEST_TIMEOUT_MS = 2000;
|
||||||
|
|
||||||
|
private static final int CONNTECTION_TIMEOUT_MS = 2000;
|
||||||
|
|
||||||
private final String dnetMessageEndpoint;
|
private final String dnetMessageEndpoint;
|
||||||
|
|
||||||
public MessageSender(final String dnetMessageEndpoint) {
|
public MessageSender(final String dnetMessageEndpoint) {
|
||||||
|
@ -20,10 +27,25 @@ public class MessageSender {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendMessage(final Message message) {
|
public void sendMessage(final Message message) {
|
||||||
|
new Thread(() -> _sendMessage(message)).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void _sendMessage(final Message message) {
|
||||||
final HttpPut req = new HttpPut(dnetMessageEndpoint);
|
final HttpPut req = new HttpPut(dnetMessageEndpoint);
|
||||||
req.setEntity(new SerializableEntity(message));
|
req.setEntity(new SerializableEntity(message));
|
||||||
|
|
||||||
try (final CloseableHttpClient client = HttpClients.createDefault();
|
final RequestConfig requestConfig = RequestConfig
|
||||||
|
.custom()
|
||||||
|
.setConnectTimeout(CONNTECTION_TIMEOUT_MS)
|
||||||
|
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS)
|
||||||
|
.setSocketTimeout(SOCKET_TIMEOUT_MS)
|
||||||
|
.build();
|
||||||
|
;
|
||||||
|
|
||||||
|
try (final CloseableHttpClient client = HttpClients
|
||||||
|
.custom()
|
||||||
|
.setDefaultRequestConfig(requestConfig)
|
||||||
|
.build();
|
||||||
final CloseableHttpResponse response = client.execute(req)) {
|
final CloseableHttpResponse response = client.execute(req)) {
|
||||||
log.debug("Sent Message to " + dnetMessageEndpoint);
|
log.debug("Sent Message to " + dnetMessageEndpoint);
|
||||||
log.debug("MESSAGE:" + message);
|
log.debug("MESSAGE:" + message);
|
|
@ -1,6 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.message;
|
|
||||||
|
|
||||||
public enum MessageType {
|
|
||||||
ONGOING, REPORT
|
|
||||||
}
|
|
Loading…
Reference in New Issue