forked from D-Net/dnet-hadoop
http entity as json string
This commit is contained in:
parent
730973679a
commit
2ee0c3e47e
|
@ -10,7 +10,6 @@ public class Message implements Serializable {
|
||||||
public static String CURRENT_PARAM = "current";
|
public static String CURRENT_PARAM = "current";
|
||||||
public static String TOTAL_PARAM = "total";
|
public static String TOTAL_PARAM = "total";
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -4,77 +4,84 @@ package eu.dnetlib.dhp.message;
|
||||||
import org.apache.http.client.config.RequestConfig;
|
import org.apache.http.client.config.RequestConfig;
|
||||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||||
import org.apache.http.client.methods.HttpPut;
|
import org.apache.http.client.methods.HttpPut;
|
||||||
import org.apache.http.entity.SerializableEntity;
|
import org.apache.http.entity.ContentType;
|
||||||
|
import org.apache.http.entity.StringEntity;
|
||||||
import org.apache.http.impl.client.CloseableHttpClient;
|
import org.apache.http.impl.client.CloseableHttpClient;
|
||||||
import org.apache.http.impl.client.HttpClients;
|
import org.apache.http.impl.client.HttpClients;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import java.util.function.Function;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
public class MessageSender {
|
public class MessageSender {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(MessageSender.class);
|
private static final Logger log = LoggerFactory.getLogger(MessageSender.class);
|
||||||
|
|
||||||
private static final int SOCKET_TIMEOUT_MS = 2000;
|
private static final int SOCKET_TIMEOUT_MS = 2000;
|
||||||
|
|
||||||
private static final int CONNECTION_REQUEST_TIMEOUT_MS = 2000;
|
private static final int CONNECTION_REQUEST_TIMEOUT_MS = 2000;
|
||||||
|
|
||||||
private static final int CONNTECTION_TIMEOUT_MS = 2000;
|
private static final int CONNTECTION_TIMEOUT_MS = 2000;
|
||||||
|
|
||||||
private final String dnetMessageEndpoint;
|
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||||
|
|
||||||
private final String workflowId;
|
private final String dnetMessageEndpoint;
|
||||||
|
|
||||||
|
private final String workflowId;
|
||||||
|
|
||||||
public MessageSender(final String dnetMessageEndpoint, final String workflowId) {
|
public MessageSender(final String dnetMessageEndpoint, final String workflowId) {
|
||||||
this.workflowId = workflowId;
|
this.workflowId = workflowId;
|
||||||
this.dnetMessageEndpoint = dnetMessageEndpoint;
|
this.dnetMessageEndpoint = dnetMessageEndpoint;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendMessage(final Message message) {
|
public void sendMessage(final Message message) {
|
||||||
new Thread(() -> _sendMessage(message)).start();
|
new Thread(() -> _sendMessage(message)).start();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendMessage(final Long current, final Long total) {
|
public void sendMessage(final Long current, final Long total) {
|
||||||
sendMessage(createMessage(current, total));
|
sendMessage(createMessage(current, total));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Message createMessage(final Long current, final Long total) {
|
||||||
|
|
||||||
private Message createMessage(final Long current, final Long total) {
|
final Message m = new Message();
|
||||||
|
m.setWorkflowId(workflowId);
|
||||||
|
m.getBody().put(Message.CURRENT_PARAM, current.toString());
|
||||||
|
if (total != null) {
|
||||||
|
m.getBody().put(Message.TOTAL_PARAM, total.toString());
|
||||||
|
}
|
||||||
|
return m;
|
||||||
|
}
|
||||||
|
|
||||||
final Message m = new Message();
|
private void _sendMessage(final Message message) {
|
||||||
m.setWorkflowId(workflowId);
|
try {
|
||||||
m.getBody().put(Message.CURRENT_PARAM, current.toString());
|
final String json = objectMapper.writeValueAsString(message);
|
||||||
if (total != null)
|
|
||||||
m.getBody().put(Message.TOTAL_PARAM, total.toString());
|
|
||||||
return m;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
final HttpPut req = new HttpPut(dnetMessageEndpoint);
|
||||||
|
req.setEntity(new StringEntity(json, ContentType.APPLICATION_JSON));
|
||||||
|
|
||||||
private void _sendMessage(final Message message) {
|
final RequestConfig requestConfig = RequestConfig
|
||||||
final HttpPut req = new HttpPut(dnetMessageEndpoint);
|
.custom()
|
||||||
req.setEntity(new SerializableEntity(message));
|
.setConnectTimeout(CONNTECTION_TIMEOUT_MS)
|
||||||
|
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS)
|
||||||
|
.setSocketTimeout(SOCKET_TIMEOUT_MS)
|
||||||
|
.build();
|
||||||
|
;
|
||||||
|
|
||||||
final RequestConfig requestConfig = RequestConfig
|
try (final CloseableHttpClient client = HttpClients
|
||||||
.custom()
|
.custom()
|
||||||
.setConnectTimeout(CONNTECTION_TIMEOUT_MS)
|
.setDefaultRequestConfig(requestConfig)
|
||||||
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS)
|
.build();
|
||||||
.setSocketTimeout(SOCKET_TIMEOUT_MS)
|
final CloseableHttpResponse response = client.execute(req)) {
|
||||||
.build();
|
log.debug("Sent Message to " + dnetMessageEndpoint);
|
||||||
;
|
log.debug("MESSAGE:" + message);
|
||||||
|
} catch (final Throwable e) {
|
||||||
try (final CloseableHttpClient client = HttpClients
|
log.error("Error sending message to " + dnetMessageEndpoint + ", message content: " + message, e);
|
||||||
.custom()
|
}
|
||||||
.setDefaultRequestConfig(requestConfig)
|
} catch (final JsonProcessingException e) {
|
||||||
.build();
|
log.error("Error sending message to " + dnetMessageEndpoint + ", message content: " + message, e);
|
||||||
final CloseableHttpResponse response = client.execute(req)) {
|
}
|
||||||
log.debug("Sent Message to " + dnetMessageEndpoint);
|
}
|
||||||
log.debug("MESSAGE:" + message);
|
|
||||||
} catch (final Throwable e) {
|
|
||||||
log.error("Error sending message to " + dnetMessageEndpoint + ", message content: " + message, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue