forked from antonis.lempesis/dnet-hadoop
implemented messaging btween collection worker and Dnet
This commit is contained in:
parent
72c57b28fa
commit
4dae5e605d
|
@ -2,10 +2,15 @@
|
||||||
package eu.dnetlib.dhp.message;
|
package eu.dnetlib.dhp.message;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public class Message implements Serializable {
|
public class Message implements Serializable {
|
||||||
|
|
||||||
|
public static String CURRENT_PARAM = "current";
|
||||||
|
public static String TOTAL_PARAM = "total";
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
@ -13,17 +18,14 @@ public class Message implements Serializable {
|
||||||
|
|
||||||
private String workflowId;
|
private String workflowId;
|
||||||
|
|
||||||
private String jobName;
|
|
||||||
|
|
||||||
private Map<String, String> body;
|
private Map<String, String> body;
|
||||||
|
|
||||||
public Message() {
|
public Message() {
|
||||||
|
body = new HashMap<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Message(final String workflowId, final String jobName,
|
public Message(final String workflowId, final Map<String, String> body) {
|
||||||
final Map<String, String> body) {
|
|
||||||
this.workflowId = workflowId;
|
this.workflowId = workflowId;
|
||||||
this.jobName = jobName;
|
|
||||||
this.body = body;
|
this.body = body;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,14 +37,6 @@ public class Message implements Serializable {
|
||||||
this.workflowId = workflowId;
|
this.workflowId = workflowId;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getJobName() {
|
|
||||||
return jobName;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setJobName(final String jobName) {
|
|
||||||
this.jobName = jobName;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Map<String, String> getBody() {
|
public Map<String, String> getBody() {
|
||||||
return body;
|
return body;
|
||||||
}
|
}
|
||||||
|
@ -53,6 +47,6 @@ public class Message implements Serializable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return String.format("Message [workflowId=%s, jobName=%s, body=%s]", workflowId, jobName, body);
|
return String.format("Message [workflowId=%s, body=%s]", workflowId, body);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,48 +10,71 @@ import org.apache.http.impl.client.HttpClients;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
public class MessageSender {
|
public class MessageSender {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(MessageSender.class);
|
private static final Logger log = LoggerFactory.getLogger(MessageSender.class);
|
||||||
|
|
||||||
private static final int SOCKET_TIMEOUT_MS = 2000;
|
private static final int SOCKET_TIMEOUT_MS = 2000;
|
||||||
|
|
||||||
private static final int CONNECTION_REQUEST_TIMEOUT_MS = 2000;
|
private static final int CONNECTION_REQUEST_TIMEOUT_MS = 2000;
|
||||||
|
|
||||||
private static final int CONNTECTION_TIMEOUT_MS = 2000;
|
private static final int CONNTECTION_TIMEOUT_MS = 2000;
|
||||||
|
|
||||||
private final String dnetMessageEndpoint;
|
private final String dnetMessageEndpoint;
|
||||||
|
|
||||||
public MessageSender(final String dnetMessageEndpoint) {
|
private final String workflowId;
|
||||||
this.dnetMessageEndpoint = dnetMessageEndpoint;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void sendMessage(final Message message) {
|
|
||||||
new Thread(() -> _sendMessage(message)).start();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void _sendMessage(final Message message) {
|
public MessageSender(final String dnetMessageEndpoint, final String workflowId) {
|
||||||
final HttpPut req = new HttpPut(dnetMessageEndpoint);
|
this.workflowId = workflowId;
|
||||||
req.setEntity(new SerializableEntity(message));
|
this.dnetMessageEndpoint = dnetMessageEndpoint;
|
||||||
|
}
|
||||||
|
|
||||||
final RequestConfig requestConfig = RequestConfig
|
public void sendMessage(final Message message) {
|
||||||
.custom()
|
new Thread(() -> _sendMessage(message)).start();
|
||||||
.setConnectTimeout(CONNTECTION_TIMEOUT_MS)
|
}
|
||||||
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS)
|
|
||||||
.setSocketTimeout(SOCKET_TIMEOUT_MS)
|
|
||||||
.build();
|
|
||||||
;
|
|
||||||
|
|
||||||
try (final CloseableHttpClient client = HttpClients
|
public void sendMessage(final Long current, final Long total) {
|
||||||
.custom()
|
sendMessage(createMessage(current, total));
|
||||||
.setDefaultRequestConfig(requestConfig)
|
}
|
||||||
.build();
|
|
||||||
final CloseableHttpResponse response = client.execute(req)) {
|
|
||||||
log.debug("Sent Message to " + dnetMessageEndpoint);
|
private Message createMessage(final Long current, final Long total) {
|
||||||
log.debug("MESSAGE:" + message);
|
|
||||||
} catch (final Throwable e) {
|
final Message m = new Message();
|
||||||
log.error("Error sending message to " + dnetMessageEndpoint + ", message content: " + message, e);
|
m.setWorkflowId(workflowId);
|
||||||
}
|
m.getBody().put(Message.CURRENT_PARAM, current.toString());
|
||||||
}
|
if (total != null)
|
||||||
|
m.getBody().put(Message.TOTAL_PARAM, total.toString());
|
||||||
|
return m;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void _sendMessage(final Message message) {
|
||||||
|
final HttpPut req = new HttpPut(dnetMessageEndpoint);
|
||||||
|
req.setEntity(new SerializableEntity(message));
|
||||||
|
|
||||||
|
final RequestConfig requestConfig = RequestConfig
|
||||||
|
.custom()
|
||||||
|
.setConnectTimeout(CONNTECTION_TIMEOUT_MS)
|
||||||
|
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS)
|
||||||
|
.setSocketTimeout(SOCKET_TIMEOUT_MS)
|
||||||
|
.build();
|
||||||
|
;
|
||||||
|
|
||||||
|
try (final CloseableHttpClient client = HttpClients
|
||||||
|
.custom()
|
||||||
|
.setDefaultRequestConfig(requestConfig)
|
||||||
|
.build();
|
||||||
|
final CloseableHttpResponse response = client.execute(req)) {
|
||||||
|
log.debug("Sent Message to " + dnetMessageEndpoint);
|
||||||
|
log.debug("MESSAGE:" + message);
|
||||||
|
} catch (final Throwable e) {
|
||||||
|
log.error("Error sending message to " + dnetMessageEndpoint + ", message content: " + message, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,8 @@ import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.message.Message;
|
||||||
|
import eu.dnetlib.dhp.message.MessageSender;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -29,13 +31,18 @@ public class CollectorWorker {
|
||||||
|
|
||||||
private final String hdfsPath;
|
private final String hdfsPath;
|
||||||
|
|
||||||
|
private final MessageSender messageSender;
|
||||||
|
|
||||||
|
|
||||||
public CollectorWorker(
|
public CollectorWorker(
|
||||||
final ApiDescriptor api,
|
final ApiDescriptor api,
|
||||||
final String hdfsuri,
|
final String hdfsuri,
|
||||||
final String hdfsPath) {
|
final String hdfsPath,
|
||||||
|
final MessageSender messageSender) {
|
||||||
this.api = api;
|
this.api = api;
|
||||||
this.hdfsuri = hdfsuri;
|
this.hdfsuri = hdfsuri;
|
||||||
this.hdfsPath = hdfsPath;
|
this.hdfsPath = hdfsPath;
|
||||||
|
this.messageSender = messageSender;
|
||||||
}
|
}
|
||||||
|
|
||||||
public CollectorPluginErrorLogList collect() throws IOException, CollectorException {
|
public CollectorPluginErrorLogList collect() throws IOException, CollectorException {
|
||||||
|
@ -58,6 +65,7 @@ public class CollectorWorker {
|
||||||
|
|
||||||
final CollectorPlugin plugin = CollectorPluginFactory.getPluginByProtocol(api.getProtocol());
|
final CollectorPlugin plugin = CollectorPluginFactory.getPluginByProtocol(api.getProtocol());
|
||||||
final AtomicInteger counter = new AtomicInteger(0);
|
final AtomicInteger counter = new AtomicInteger(0);
|
||||||
|
|
||||||
try (SequenceFile.Writer writer = SequenceFile
|
try (SequenceFile.Writer writer = SequenceFile
|
||||||
.createWriter(
|
.createWriter(
|
||||||
conf,
|
conf,
|
||||||
|
@ -71,6 +79,8 @@ public class CollectorWorker {
|
||||||
.forEach(
|
.forEach(
|
||||||
content -> {
|
content -> {
|
||||||
key.set(counter.getAndIncrement());
|
key.set(counter.getAndIncrement());
|
||||||
|
if (counter.get()% 500 == 0)
|
||||||
|
messageSender.sendMessage(counter.longValue(), null);
|
||||||
value.set(content);
|
value.set(content);
|
||||||
try {
|
try {
|
||||||
writer.append(key, value);
|
writer.append(key, value);
|
||||||
|
@ -79,6 +89,7 @@ public class CollectorWorker {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} finally {
|
} finally {
|
||||||
|
messageSender.sendMessage(counter.longValue(),counter.longValue());
|
||||||
return plugin.getCollectionErrors();
|
return plugin.getCollectionErrors();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,8 @@ import static eu.dnetlib.dhp.application.ApplicationUtils.*;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.message.Message;
|
||||||
|
import eu.dnetlib.dhp.message.MessageSender;
|
||||||
import org.apache.commons.cli.ParseException;
|
import org.apache.commons.cli.ParseException;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -57,17 +59,27 @@ public class CollectorWorkerApplication {
|
||||||
final String mdStoreVersion = argumentParser.get("mdStoreVersion");
|
final String mdStoreVersion = argumentParser.get("mdStoreVersion");
|
||||||
log.info("mdStoreVersion is {}", mdStoreVersion);
|
log.info("mdStoreVersion is {}", mdStoreVersion);
|
||||||
|
|
||||||
|
final String dnetMessageManagerURL = argumentParser.get("dnetMessageManagerURL");
|
||||||
|
log.info("dnetMessageManagerURL is {}", dnetMessageManagerURL);
|
||||||
|
|
||||||
|
final String workflowId = argumentParser.get("workflowId");
|
||||||
|
log.info("workflowId is {}", workflowId);
|
||||||
|
|
||||||
|
final MessageSender ms = new MessageSender(dnetMessageManagerURL,workflowId);
|
||||||
|
|
||||||
final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class);
|
final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class);
|
||||||
final String hdfsPath = currentVersion.getHdfsPath() + SEQUENCE_FILE_NAME;
|
final String hdfsPath = currentVersion.getHdfsPath() + SEQUENCE_FILE_NAME;
|
||||||
log.info("hdfs path is {}", hdfsPath);
|
log.info("hdfs path is {}", hdfsPath);
|
||||||
|
|
||||||
final ApiDescriptor api = MAPPER.readValue(apiDescriptor, ApiDescriptor.class);
|
final ApiDescriptor api = MAPPER.readValue(apiDescriptor, ApiDescriptor.class);
|
||||||
|
|
||||||
final CollectorWorker worker = new CollectorWorker(api, hdfsuri, hdfsPath);
|
final CollectorWorker worker = new CollectorWorker(api, hdfsuri, hdfsPath, ms);
|
||||||
CollectorPluginErrorLogList errors = worker.collect();
|
CollectorPluginErrorLogList errors = worker.collect();
|
||||||
|
|
||||||
populateOOZIEEnv(COLLECTOR_WORKER_ERRORS, errors.toString());
|
populateOOZIEEnv(COLLECTOR_WORKER_ERRORS, errors.toString());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,10 +17,18 @@
|
||||||
"paramDescription": "the MDStore Version bean",
|
"paramDescription": "the MDStore Version bean",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"paramName": "dm",
|
||||||
|
"paramLongName": "dnetMessageManagerURL",
|
||||||
|
"paramDescription": "the End point URL to send Messages",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
|
||||||
|
|
||||||
{
|
{
|
||||||
"paramName": "w",
|
"paramName": "w",
|
||||||
"paramLongName": "workflowId",
|
"paramLongName": "workflowId",
|
||||||
"paramDescription": "the identifier of the dnet Workflow",
|
"paramDescription": "the identifier of the dnet Workflow",
|
||||||
"paramRequired": false
|
"paramRequired": true
|
||||||
}
|
}
|
||||||
]
|
]
|
|
@ -32,6 +32,11 @@
|
||||||
<name>mdStoreManagerURI</name>
|
<name>mdStoreManagerURI</name>
|
||||||
<description>The URI of the MDStore Manager</description>
|
<description>The URI of the MDStore Manager</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dnetMessageManagerURL</name>
|
||||||
|
<description>The URI of the Dnet Message Manager</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>collectionMode</name>
|
<name>collectionMode</name>
|
||||||
<description>Should be REFRESH or INCREMENTAL</description>
|
<description>Should be REFRESH or INCREMENTAL</description>
|
||||||
|
@ -86,7 +91,10 @@
|
||||||
<main-class>eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication</main-class>
|
<main-class>eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication</main-class>
|
||||||
<arg>--apidescriptor</arg><arg>${apiDescription}</arg>
|
<arg>--apidescriptor</arg><arg>${apiDescription}</arg>
|
||||||
<arg>--namenode</arg><arg>${nameNode}</arg>
|
<arg>--namenode</arg><arg>${nameNode}</arg>
|
||||||
|
<arg>--workflowId</arg><arg>${workflowId}</arg>
|
||||||
|
<arg>--dnetMessageManagerURL</arg><arg>${dnetMessageManagerURL}</arg>
|
||||||
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
|
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
|
||||||
|
|
||||||
<capture-output/>
|
<capture-output/>
|
||||||
</java>
|
</java>
|
||||||
<ok to="GenerateNativeStoreSparkJob"/>
|
<ok to="GenerateNativeStoreSparkJob"/>
|
||||||
|
|
|
@ -36,17 +36,6 @@ public class DnetCollectorWorkerApplicationTests {
|
||||||
assertNotNull(mapper.writeValueAsString(api));
|
assertNotNull(mapper.writeValueAsString(api));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFeeding(@TempDir Path testDir) throws Exception {
|
|
||||||
|
|
||||||
System.out.println(testDir.toString());
|
|
||||||
CollectorWorker worker = new CollectorWorker(getApi(),
|
|
||||||
"file://" + testDir.toString() + "/file.seq", testDir.toString() + "/file.seq");
|
|
||||||
worker.collect();
|
|
||||||
|
|
||||||
// TODO create ASSERT HERE
|
|
||||||
}
|
|
||||||
|
|
||||||
private ApiDescriptor getApi() {
|
private ApiDescriptor getApi() {
|
||||||
final ApiDescriptor api = new ApiDescriptor();
|
final ApiDescriptor api = new ApiDescriptor();
|
||||||
api.setId("oai");
|
api.setId("oai");
|
||||||
|
|
Loading…
Reference in New Issue