1
0
Fork 0

Merge branch 'hadoop_aggregator' of https://code-repo.d4science.org/D-Net/dnet-hadoop into hadoop_aggregator

This commit is contained in:
Claudio Atzori 2021-02-03 17:58:29 +01:00
commit 4758b58aa2
3 changed files with 110 additions and 0 deletions

View File

@ -0,0 +1,69 @@
package eu.dnetlib.message;
import java.io.Serializable;
import java.util.Map;
public class Message implements Serializable {
/**
*
*/
private static final long serialVersionUID = 401753881204524893L;
private String workflowId;
private String jobName;
private MessageType type;
private Map<String, String> body;
public Message() {
}
public Message(final String workflowId, final String jobName, final MessageType type,
final Map<String, String> body) {
this.workflowId = workflowId;
this.jobName = jobName;
this.type = type;
this.body = body;
}
public String getWorkflowId() {
return workflowId;
}
public void setWorkflowId(final String workflowId) {
this.workflowId = workflowId;
}
public String getJobName() {
return jobName;
}
public void setJobName(final String jobName) {
this.jobName = jobName;
}
public MessageType getType() {
return type;
}
public void setType(final MessageType type) {
this.type = type;
}
public Map<String, String> getBody() {
return body;
}
public void setBody(final Map<String, String> body) {
this.body = body;
}
@Override
public String toString() {
return String.format("Message [workflowId=%s, jobName=%s, type=%s, body=%s]", workflowId, jobName, type, body);
}
}

View File

@ -0,0 +1,35 @@
package eu.dnetlib.message;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.SerializableEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageSender {
private static final Logger log = LoggerFactory.getLogger(MessageSender.class);
private final String dnetMessageEndpoint;
public MessageSender(final String dnetMessageEndpoint) {
this.dnetMessageEndpoint = dnetMessageEndpoint;
}
public void sendMessage(final Message message) {
final HttpPut req = new HttpPut(dnetMessageEndpoint);
req.setEntity(new SerializableEntity(message));
try (final CloseableHttpClient client = HttpClients.createDefault();
final CloseableHttpResponse response = client.execute(req)) {
log.debug("Sent Message to " + dnetMessageEndpoint);
log.debug("MESSAGE:" + message);
} catch (final Throwable e) {
log.error("Error sending message to " + dnetMessageEndpoint + ", message content: " + message, e);
}
}
}

View File

@ -0,0 +1,6 @@
package eu.dnetlib.message;
public enum MessageType {
ONGOING, REPORT
}