diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/message/Message.java b/dhp-common/src/main/java/eu/dnetlib/dhp/message/Message.java index 57844d4903..978af6dd83 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/message/Message.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/message/Message.java @@ -2,10 +2,15 @@ package eu.dnetlib.dhp.message; import java.io.Serializable; +import java.util.HashMap; import java.util.Map; public class Message implements Serializable { + public static String CURRENT_PARAM = "current"; + public static String TOTAL_PARAM = "total"; + + /** * */ @@ -13,17 +18,14 @@ public class Message implements Serializable { private String workflowId; - private String jobName; - private Map body; public Message() { + body = new HashMap<>(); } - public Message(final String workflowId, final String jobName, - final Map body) { + public Message(final String workflowId, final Map body) { this.workflowId = workflowId; - this.jobName = jobName; this.body = body; } @@ -35,14 +37,6 @@ public class Message implements Serializable { this.workflowId = workflowId; } - public String getJobName() { - return jobName; - } - - public void setJobName(final String jobName) { - this.jobName = jobName; - } - public Map getBody() { return body; } @@ -53,6 +47,6 @@ public class Message implements Serializable { @Override public String toString() { - return String.format("Message [workflowId=%s, jobName=%s, body=%s]", workflowId, jobName, body); + return String.format("Message [workflowId=%s, body=%s]", workflowId, body); } } diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java b/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java index 70eb594f82..35ecaa50cb 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java @@ -10,48 +10,71 @@ import org.apache.http.impl.client.HttpClients; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + public class MessageSender { - private static final Logger log = LoggerFactory.getLogger(MessageSender.class); + private static final Logger log = LoggerFactory.getLogger(MessageSender.class); - private static final int SOCKET_TIMEOUT_MS = 2000; + private static final int SOCKET_TIMEOUT_MS = 2000; - private static final int CONNECTION_REQUEST_TIMEOUT_MS = 2000; + private static final int CONNECTION_REQUEST_TIMEOUT_MS = 2000; - private static final int CONNTECTION_TIMEOUT_MS = 2000; + private static final int CONNTECTION_TIMEOUT_MS = 2000; - private final String dnetMessageEndpoint; + private final String dnetMessageEndpoint; - public MessageSender(final String dnetMessageEndpoint) { - this.dnetMessageEndpoint = dnetMessageEndpoint; - } + private final String workflowId; - public void sendMessage(final Message message) { - new Thread(() -> _sendMessage(message)).start(); - } - private void _sendMessage(final Message message) { - final HttpPut req = new HttpPut(dnetMessageEndpoint); - req.setEntity(new SerializableEntity(message)); + public MessageSender(final String dnetMessageEndpoint, final String workflowId) { + this.workflowId = workflowId; + this.dnetMessageEndpoint = dnetMessageEndpoint; + } - final RequestConfig requestConfig = RequestConfig - .custom() - .setConnectTimeout(CONNTECTION_TIMEOUT_MS) - .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS) - .setSocketTimeout(SOCKET_TIMEOUT_MS) - .build(); - ; + public void sendMessage(final Message message) { + new Thread(() -> _sendMessage(message)).start(); + } - try (final CloseableHttpClient client = HttpClients - .custom() - .setDefaultRequestConfig(requestConfig) - .build(); - final CloseableHttpResponse response = client.execute(req)) { - log.debug("Sent Message to " + dnetMessageEndpoint); - log.debug("MESSAGE:" + message); - } catch (final Throwable e) { - log.error("Error sending message to " + dnetMessageEndpoint + ", message content: " + message, e); - } - } + public void sendMessage(final Long current, final Long total) { + sendMessage(createMessage(current, total)); + } + + + private Message createMessage(final Long current, final Long total) { + + final Message m = new Message(); + m.setWorkflowId(workflowId); + m.getBody().put(Message.CURRENT_PARAM, current.toString()); + if (total != null) + m.getBody().put(Message.TOTAL_PARAM, total.toString()); + return m; + } + + + private void _sendMessage(final Message message) { + final HttpPut req = new HttpPut(dnetMessageEndpoint); + req.setEntity(new SerializableEntity(message)); + + final RequestConfig requestConfig = RequestConfig + .custom() + .setConnectTimeout(CONNTECTION_TIMEOUT_MS) + .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS) + .setSocketTimeout(SOCKET_TIMEOUT_MS) + .build(); + ; + + try (final CloseableHttpClient client = HttpClients + .custom() + .setDefaultRequestConfig(requestConfig) + .build(); + final CloseableHttpResponse response = client.execute(req)) { + log.debug("Sent Message to " + dnetMessageEndpoint); + log.debug("MESSAGE:" + message); + } catch (final Throwable e) { + log.error("Error sending message to " + dnetMessageEndpoint + ", message content: " + message, e); + } + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java index f1d3aec9cd..9d82a1ed4d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java @@ -5,6 +5,8 @@ import java.io.IOException; import java.net.URI; import java.util.concurrent.atomic.AtomicInteger; +import eu.dnetlib.dhp.message.Message; +import eu.dnetlib.dhp.message.MessageSender; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -29,13 +31,18 @@ public class CollectorWorker { private final String hdfsPath; + private final MessageSender messageSender; + + public CollectorWorker( final ApiDescriptor api, final String hdfsuri, - final String hdfsPath) { + final String hdfsPath, + final MessageSender messageSender) { this.api = api; this.hdfsuri = hdfsuri; this.hdfsPath = hdfsPath; + this.messageSender = messageSender; } public CollectorPluginErrorLogList collect() throws IOException, CollectorException { @@ -58,6 +65,7 @@ public class CollectorWorker { final CollectorPlugin plugin = CollectorPluginFactory.getPluginByProtocol(api.getProtocol()); final AtomicInteger counter = new AtomicInteger(0); + try (SequenceFile.Writer writer = SequenceFile .createWriter( conf, @@ -71,6 +79,8 @@ public class CollectorWorker { .forEach( content -> { key.set(counter.getAndIncrement()); + if (counter.get()% 500 == 0) + messageSender.sendMessage(counter.longValue(), null); value.set(content); try { writer.append(key, value); @@ -79,6 +89,7 @@ public class CollectorWorker { } }); } finally { + messageSender.sendMessage(counter.longValue(),counter.longValue()); return plugin.getCollectionErrors(); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java index 7ec830879c..d8e8a8e49e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java @@ -7,6 +7,8 @@ import static eu.dnetlib.dhp.application.ApplicationUtils.*; import java.io.IOException; +import eu.dnetlib.dhp.message.Message; +import eu.dnetlib.dhp.message.MessageSender; import org.apache.commons.cli.ParseException; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; @@ -57,17 +59,27 @@ public class CollectorWorkerApplication { final String mdStoreVersion = argumentParser.get("mdStoreVersion"); log.info("mdStoreVersion is {}", mdStoreVersion); + final String dnetMessageManagerURL = argumentParser.get("dnetMessageManagerURL"); + log.info("dnetMessageManagerURL is {}", dnetMessageManagerURL); + + final String workflowId = argumentParser.get("workflowId"); + log.info("workflowId is {}", workflowId); + + final MessageSender ms = new MessageSender(dnetMessageManagerURL,workflowId); + final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class); final String hdfsPath = currentVersion.getHdfsPath() + SEQUENCE_FILE_NAME; log.info("hdfs path is {}", hdfsPath); final ApiDescriptor api = MAPPER.readValue(apiDescriptor, ApiDescriptor.class); - final CollectorWorker worker = new CollectorWorker(api, hdfsuri, hdfsPath); + final CollectorWorker worker = new CollectorWorker(api, hdfsuri, hdfsPath, ms); CollectorPluginErrorLogList errors = worker.collect(); populateOOZIEEnv(COLLECTOR_WORKER_ERRORS, errors.toString()); } + + } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_parameter.json index 60e9762ff2..6ccba468ab 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_parameter.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_parameter.json @@ -17,10 +17,18 @@ "paramDescription": "the MDStore Version bean", "paramRequired": true }, + { + "paramName": "dm", + "paramLongName": "dnetMessageManagerURL", + "paramDescription": "the End point URL to send Messages", + "paramRequired": true + }, + + { "paramName": "w", "paramLongName": "workflowId", "paramDescription": "the identifier of the dnet Workflow", - "paramRequired": false + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml index 595613a2ee..b74ef6b61b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml @@ -32,6 +32,11 @@ mdStoreManagerURI The URI of the MDStore Manager + + + dnetMessageManagerURL + The URI of the Dnet Message Manager + collectionMode Should be REFRESH or INCREMENTAL @@ -86,7 +91,10 @@ eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication --apidescriptor${apiDescription} --namenode${nameNode} + --workflowId${workflowId} + --dnetMessageManagerURL${dnetMessageManagerURL} --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java index 10964096cc..9066e32b63 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java @@ -36,17 +36,6 @@ public class DnetCollectorWorkerApplicationTests { assertNotNull(mapper.writeValueAsString(api)); } - @Test - public void testFeeding(@TempDir Path testDir) throws Exception { - - System.out.println(testDir.toString()); - CollectorWorker worker = new CollectorWorker(getApi(), - "file://" + testDir.toString() + "/file.seq", testDir.toString() + "/file.seq"); - worker.collect(); - - // TODO create ASSERT HERE - } - private ApiDescriptor getApi() { final ApiDescriptor api = new ApiDescriptor(); api.setId("oai");