Merge branch 'hadoop_aggregator' of https://code-repo.d4science.org/D-Net/dnet-hadoop into hadoop_aggregator

This commit is contained in:
Claudio Atzori 2021-02-04 17:25:00 +01:00
commit 730973679a
7 changed files with 105 additions and 60 deletions

View File

@ -2,10 +2,15 @@
package eu.dnetlib.dhp.message; package eu.dnetlib.dhp.message;
import java.io.Serializable; import java.io.Serializable;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
public class Message implements Serializable { public class Message implements Serializable {
public static String CURRENT_PARAM = "current";
public static String TOTAL_PARAM = "total";
/** /**
* *
*/ */
@ -13,17 +18,14 @@ public class Message implements Serializable {
private String workflowId; private String workflowId;
private String jobName;
private Map<String, String> body; private Map<String, String> body;
public Message() { public Message() {
body = new HashMap<>();
} }
public Message(final String workflowId, final String jobName, public Message(final String workflowId, final Map<String, String> body) {
final Map<String, String> body) {
this.workflowId = workflowId; this.workflowId = workflowId;
this.jobName = jobName;
this.body = body; this.body = body;
} }
@ -35,14 +37,6 @@ public class Message implements Serializable {
this.workflowId = workflowId; this.workflowId = workflowId;
} }
public String getJobName() {
return jobName;
}
public void setJobName(final String jobName) {
this.jobName = jobName;
}
public Map<String, String> getBody() { public Map<String, String> getBody() {
return body; return body;
} }
@ -53,6 +47,6 @@ public class Message implements Serializable {
@Override @Override
public String toString() { public String toString() {
return String.format("Message [workflowId=%s, jobName=%s, body=%s]", workflowId, jobName, body); return String.format("Message [workflowId=%s, body=%s]", workflowId, body);
} }
} }

View File

@ -10,48 +10,71 @@ import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
public class MessageSender { public class MessageSender {
private static final Logger log = LoggerFactory.getLogger(MessageSender.class); private static final Logger log = LoggerFactory.getLogger(MessageSender.class);
private static final int SOCKET_TIMEOUT_MS = 2000; private static final int SOCKET_TIMEOUT_MS = 2000;
private static final int CONNECTION_REQUEST_TIMEOUT_MS = 2000; private static final int CONNECTION_REQUEST_TIMEOUT_MS = 2000;
private static final int CONNTECTION_TIMEOUT_MS = 2000; private static final int CONNTECTION_TIMEOUT_MS = 2000;
private final String dnetMessageEndpoint; private final String dnetMessageEndpoint;
public MessageSender(final String dnetMessageEndpoint) { private final String workflowId;
this.dnetMessageEndpoint = dnetMessageEndpoint;
}
public void sendMessage(final Message message) {
new Thread(() -> _sendMessage(message)).start();
}
private void _sendMessage(final Message message) { public MessageSender(final String dnetMessageEndpoint, final String workflowId) {
final HttpPut req = new HttpPut(dnetMessageEndpoint); this.workflowId = workflowId;
req.setEntity(new SerializableEntity(message)); this.dnetMessageEndpoint = dnetMessageEndpoint;
}
final RequestConfig requestConfig = RequestConfig public void sendMessage(final Message message) {
.custom() new Thread(() -> _sendMessage(message)).start();
.setConnectTimeout(CONNTECTION_TIMEOUT_MS) }
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS)
.setSocketTimeout(SOCKET_TIMEOUT_MS)
.build();
;
try (final CloseableHttpClient client = HttpClients public void sendMessage(final Long current, final Long total) {
.custom() sendMessage(createMessage(current, total));
.setDefaultRequestConfig(requestConfig) }
.build();
final CloseableHttpResponse response = client.execute(req)) {
log.debug("Sent Message to " + dnetMessageEndpoint); private Message createMessage(final Long current, final Long total) {
log.debug("MESSAGE:" + message);
} catch (final Throwable e) { final Message m = new Message();
log.error("Error sending message to " + dnetMessageEndpoint + ", message content: " + message, e); m.setWorkflowId(workflowId);
} m.getBody().put(Message.CURRENT_PARAM, current.toString());
} if (total != null)
m.getBody().put(Message.TOTAL_PARAM, total.toString());
return m;
}
private void _sendMessage(final Message message) {
final HttpPut req = new HttpPut(dnetMessageEndpoint);
req.setEntity(new SerializableEntity(message));
final RequestConfig requestConfig = RequestConfig
.custom()
.setConnectTimeout(CONNTECTION_TIMEOUT_MS)
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS)
.setSocketTimeout(SOCKET_TIMEOUT_MS)
.build();
;
try (final CloseableHttpClient client = HttpClients
.custom()
.setDefaultRequestConfig(requestConfig)
.build();
final CloseableHttpResponse response = client.execute(req)) {
log.debug("Sent Message to " + dnetMessageEndpoint);
log.debug("MESSAGE:" + message);
} catch (final Throwable e) {
log.error("Error sending message to " + dnetMessageEndpoint + ", message content: " + message, e);
}
}
} }

View File

@ -5,6 +5,8 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import eu.dnetlib.dhp.message.Message;
import eu.dnetlib.dhp.message.MessageSender;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -29,13 +31,18 @@ public class CollectorWorker {
private final String hdfsPath; private final String hdfsPath;
private final MessageSender messageSender;
public CollectorWorker( public CollectorWorker(
final ApiDescriptor api, final ApiDescriptor api,
final String hdfsuri, final String hdfsuri,
final String hdfsPath) { final String hdfsPath,
final MessageSender messageSender) {
this.api = api; this.api = api;
this.hdfsuri = hdfsuri; this.hdfsuri = hdfsuri;
this.hdfsPath = hdfsPath; this.hdfsPath = hdfsPath;
this.messageSender = messageSender;
} }
public CollectorPluginErrorLogList collect() throws IOException, CollectorException { public CollectorPluginErrorLogList collect() throws IOException, CollectorException {
@ -58,6 +65,7 @@ public class CollectorWorker {
final CollectorPlugin plugin = CollectorPluginFactory.getPluginByProtocol(api.getProtocol()); final CollectorPlugin plugin = CollectorPluginFactory.getPluginByProtocol(api.getProtocol());
final AtomicInteger counter = new AtomicInteger(0); final AtomicInteger counter = new AtomicInteger(0);
try (SequenceFile.Writer writer = SequenceFile try (SequenceFile.Writer writer = SequenceFile
.createWriter( .createWriter(
conf, conf,
@ -71,6 +79,8 @@ public class CollectorWorker {
.forEach( .forEach(
content -> { content -> {
key.set(counter.getAndIncrement()); key.set(counter.getAndIncrement());
if (counter.get()% 500 == 0)
messageSender.sendMessage(counter.longValue(), null);
value.set(content); value.set(content);
try { try {
writer.append(key, value); writer.append(key, value);
@ -79,6 +89,7 @@ public class CollectorWorker {
} }
}); });
} finally { } finally {
messageSender.sendMessage(counter.longValue(),counter.longValue());
return plugin.getCollectionErrors(); return plugin.getCollectionErrors();
} }
} }

View File

@ -7,6 +7,8 @@ import static eu.dnetlib.dhp.application.ApplicationUtils.*;
import java.io.IOException; import java.io.IOException;
import eu.dnetlib.dhp.message.Message;
import eu.dnetlib.dhp.message.MessageSender;
import org.apache.commons.cli.ParseException; import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -57,17 +59,27 @@ public class CollectorWorkerApplication {
final String mdStoreVersion = argumentParser.get("mdStoreVersion"); final String mdStoreVersion = argumentParser.get("mdStoreVersion");
log.info("mdStoreVersion is {}", mdStoreVersion); log.info("mdStoreVersion is {}", mdStoreVersion);
final String dnetMessageManagerURL = argumentParser.get("dnetMessageManagerURL");
log.info("dnetMessageManagerURL is {}", dnetMessageManagerURL);
final String workflowId = argumentParser.get("workflowId");
log.info("workflowId is {}", workflowId);
final MessageSender ms = new MessageSender(dnetMessageManagerURL,workflowId);
final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class); final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class);
final String hdfsPath = currentVersion.getHdfsPath() + SEQUENCE_FILE_NAME; final String hdfsPath = currentVersion.getHdfsPath() + SEQUENCE_FILE_NAME;
log.info("hdfs path is {}", hdfsPath); log.info("hdfs path is {}", hdfsPath);
final ApiDescriptor api = MAPPER.readValue(apiDescriptor, ApiDescriptor.class); final ApiDescriptor api = MAPPER.readValue(apiDescriptor, ApiDescriptor.class);
final CollectorWorker worker = new CollectorWorker(api, hdfsuri, hdfsPath); final CollectorWorker worker = new CollectorWorker(api, hdfsuri, hdfsPath, ms);
CollectorPluginErrorLogList errors = worker.collect(); CollectorPluginErrorLogList errors = worker.collect();
populateOOZIEEnv(COLLECTOR_WORKER_ERRORS, errors.toString()); populateOOZIEEnv(COLLECTOR_WORKER_ERRORS, errors.toString());
} }
} }

View File

@ -17,10 +17,18 @@
"paramDescription": "the MDStore Version bean", "paramDescription": "the MDStore Version bean",
"paramRequired": true "paramRequired": true
}, },
{
"paramName": "dm",
"paramLongName": "dnetMessageManagerURL",
"paramDescription": "the End point URL to send Messages",
"paramRequired": true
},
{ {
"paramName": "w", "paramName": "w",
"paramLongName": "workflowId", "paramLongName": "workflowId",
"paramDescription": "the identifier of the dnet Workflow", "paramDescription": "the identifier of the dnet Workflow",
"paramRequired": false "paramRequired": true
} }
] ]

View File

@ -32,6 +32,11 @@
<name>mdStoreManagerURI</name> <name>mdStoreManagerURI</name>
<description>The URI of the MDStore Manager</description> <description>The URI of the MDStore Manager</description>
</property> </property>
<property>
<name>dnetMessageManagerURL</name>
<description>The URI of the Dnet Message Manager</description>
</property>
<property> <property>
<name>collectionMode</name> <name>collectionMode</name>
<description>Should be REFRESH or INCREMENTAL</description> <description>Should be REFRESH or INCREMENTAL</description>
@ -86,7 +91,10 @@
<main-class>eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication</main-class> <main-class>eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication</main-class>
<arg>--apidescriptor</arg><arg>${apiDescription}</arg> <arg>--apidescriptor</arg><arg>${apiDescription}</arg>
<arg>--namenode</arg><arg>${nameNode}</arg> <arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--workflowId</arg><arg>${workflowId}</arg>
<arg>--dnetMessageManagerURL</arg><arg>${dnetMessageManagerURL}</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg> <arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<capture-output/> <capture-output/>
</java> </java>
<ok to="GenerateNativeStoreSparkJob"/> <ok to="GenerateNativeStoreSparkJob"/>

View File

@ -36,17 +36,6 @@ public class CollectorWorkerApplicationTests {
assertNotNull(mapper.writeValueAsString(api)); assertNotNull(mapper.writeValueAsString(api));
} }
@Test
public void testFeeding(@TempDir Path testDir) throws Exception {
System.out.println(testDir.toString());
CollectorWorker worker = new CollectorWorker(getApi(),
"file://" + testDir.toString() + "/file.seq", testDir.toString() + "/file.seq");
worker.collect();
// TODO create ASSERT HERE
}
private ApiDescriptor getApi() { private ApiDescriptor getApi() {
final ApiDescriptor api = new ApiDescriptor(); final ApiDescriptor api = new ApiDescriptor();
api.setId("oai"); api.setId("oai");