From 2ee0c3e47e4b17a7902de5b4f90e215c83bdd763 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Fri, 5 Feb 2021 09:45:39 +0100 Subject: [PATCH] http entity as json string --- .../java/eu/dnetlib/dhp/message/Message.java | 1 - .../eu/dnetlib/dhp/message/MessageSender.java | 105 ++++++++++-------- 2 files changed, 56 insertions(+), 50 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/message/Message.java b/dhp-common/src/main/java/eu/dnetlib/dhp/message/Message.java index 978af6dd8..ed2a3c9b3 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/message/Message.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/message/Message.java @@ -10,7 +10,6 @@ public class Message implements Serializable { public static String CURRENT_PARAM = "current"; public static String TOTAL_PARAM = "total"; - /** * */ diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java b/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java index 35ecaa50c..3f9d07a7e 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java @@ -4,77 +4,84 @@ package eu.dnetlib.dhp.message; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPut; -import org.apache.http.entity.SerializableEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; public class MessageSender { - private static final Logger log = LoggerFactory.getLogger(MessageSender.class); + private static final Logger log = LoggerFactory.getLogger(MessageSender.class); - private static final int SOCKET_TIMEOUT_MS = 2000; + private static final int SOCKET_TIMEOUT_MS = 2000; - private static final int CONNECTION_REQUEST_TIMEOUT_MS = 2000; + private static final int CONNECTION_REQUEST_TIMEOUT_MS = 2000; - private static final int CONNTECTION_TIMEOUT_MS = 2000; + private static final int CONNTECTION_TIMEOUT_MS = 2000; - private final String dnetMessageEndpoint; + private final ObjectMapper objectMapper = new ObjectMapper(); - private final String workflowId; + private final String dnetMessageEndpoint; + private final String workflowId; - public MessageSender(final String dnetMessageEndpoint, final String workflowId) { - this.workflowId = workflowId; - this.dnetMessageEndpoint = dnetMessageEndpoint; - } + public MessageSender(final String dnetMessageEndpoint, final String workflowId) { + this.workflowId = workflowId; + this.dnetMessageEndpoint = dnetMessageEndpoint; + } - public void sendMessage(final Message message) { - new Thread(() -> _sendMessage(message)).start(); - } + public void sendMessage(final Message message) { + new Thread(() -> _sendMessage(message)).start(); + } - public void sendMessage(final Long current, final Long total) { - sendMessage(createMessage(current, total)); - } + public void sendMessage(final Long current, final Long total) { + sendMessage(createMessage(current, total)); + } + private Message createMessage(final Long current, final Long total) { - private Message createMessage(final Long current, final Long total) { + final Message m = new Message(); + m.setWorkflowId(workflowId); + m.getBody().put(Message.CURRENT_PARAM, current.toString()); + if (total != null) { + m.getBody().put(Message.TOTAL_PARAM, total.toString()); + } + return m; + } - final Message m = new Message(); - m.setWorkflowId(workflowId); - m.getBody().put(Message.CURRENT_PARAM, current.toString()); - if (total != null) - m.getBody().put(Message.TOTAL_PARAM, total.toString()); - return m; - } + private void _sendMessage(final Message message) { + try { + final String json = objectMapper.writeValueAsString(message); + final HttpPut req = new HttpPut(dnetMessageEndpoint); + req.setEntity(new StringEntity(json, ContentType.APPLICATION_JSON)); - private void _sendMessage(final Message message) { - final HttpPut req = new HttpPut(dnetMessageEndpoint); - req.setEntity(new SerializableEntity(message)); + final RequestConfig requestConfig = RequestConfig + .custom() + .setConnectTimeout(CONNTECTION_TIMEOUT_MS) + .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS) + .setSocketTimeout(SOCKET_TIMEOUT_MS) + .build(); + ; - final RequestConfig requestConfig = RequestConfig - .custom() - .setConnectTimeout(CONNTECTION_TIMEOUT_MS) - .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS) - .setSocketTimeout(SOCKET_TIMEOUT_MS) - .build(); - ; - - try (final CloseableHttpClient client = HttpClients - .custom() - .setDefaultRequestConfig(requestConfig) - .build(); - final CloseableHttpResponse response = client.execute(req)) { - log.debug("Sent Message to " + dnetMessageEndpoint); - log.debug("MESSAGE:" + message); - } catch (final Throwable e) { - log.error("Error sending message to " + dnetMessageEndpoint + ", message content: " + message, e); - } - } + try (final CloseableHttpClient client = HttpClients + .custom() + .setDefaultRequestConfig(requestConfig) + .build(); + final CloseableHttpResponse response = client.execute(req)) { + log.debug("Sent Message to " + dnetMessageEndpoint); + log.debug("MESSAGE:" + message); + } catch (final Throwable e) { + log.error("Error sending message to " + dnetMessageEndpoint + ", message content: " + message, e); + } + } catch (final JsonProcessingException e) { + log.error("Error sending message to " + dnetMessageEndpoint + ", message content: " + message, e); + } + } }