WIP: collectorWorker error reporting, added report messages

This commit is contained in:
Claudio Atzori 2021-02-15 15:08:59 +01:00
parent 523a6bfa97
commit 1abe6d1ad7
10 changed files with 57 additions and 177 deletions

View File

@ -3,31 +3,36 @@ package eu.dnetlib.dhp.message;
import java.io.Serializable; import java.io.Serializable;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
public class Message implements Serializable { public class Message implements Serializable {
private static final long serialVersionUID = 401753881204524893L;
public static String CURRENT_PARAM = "current"; public static String CURRENT_PARAM = "current";
public static String TOTAL_PARAM = "total"; public static String TOTAL_PARAM = "total";
/** private MessageType messageType;
*
*/
private static final long serialVersionUID = 401753881204524893L;
private String workflowId; private String workflowId;
private Map<String, String> body; private Map<String, String> body;
public Message() { public Message(final MessageType messageType, final String workflowId) {
body = new HashMap<>(); this(messageType, workflowId, new LinkedHashMap<>());
} }
public Message(final String workflowId, final Map<String, String> body) { public Message(final MessageType messageType, final String workflowId, final Map<String, String> body) {
this.messageType = messageType;
this.workflowId = workflowId; this.workflowId = workflowId;
this.body = body; this.body = body;
} }
public MessageType getMessageType() {
return messageType;
}
public String getWorkflowId() { public String getWorkflowId() {
return workflowId; return workflowId;
} }
@ -46,6 +51,7 @@ public class Message implements Serializable {
@Override @Override
public String toString() { public String toString() {
return String.format("Message [workflowId=%s, body=%s]", workflowId, body); return String.format("Message [type=%s, workflowId=%s, body=%s]", messageType, workflowId, body);
} }
} }

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.message; package eu.dnetlib.dhp.message;
import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -45,13 +46,15 @@ public class MessageSender {
} }
public void sendMessage(final Long current, final Long total) { public void sendMessage(final Long current, final Long total) {
sendMessage(createMessage(current, total)); sendMessage(createOngoingMessage(current, total));
} }
private Message createMessage(final Long current, final Long total) { public void sendReport(final Map<String, String> report) {
sendMessage(new Message(MessageType.REPORT, workflowId, report));
}
final Message m = new Message(); private Message createOngoingMessage(final Long current, final Long total) {
m.setWorkflowId(workflowId); final Message m = new Message(MessageType.ONGOING, workflowId);
m.getBody().put(Message.CURRENT_PARAM, current.toString()); m.getBody().put(Message.CURRENT_PARAM, current.toString());
if (total != null) { if (total != null) {
m.getBody().put(Message.TOTAL_PARAM, total.toString()); m.getBody().put(Message.TOTAL_PARAM, total.toString());

View File

@ -0,0 +1,20 @@
package eu.dnetlib.dhp.message;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
public enum MessageType {
ONGOING, REPORT;
public MessageType from(String value) {
return Optional
.ofNullable(value)
.map(StringUtils::upperCase)
.map(MessageType::valueOf)
.orElseThrow(() -> new IllegalArgumentException("unknown message type: " + value));
}
}

View File

@ -1,57 +1,39 @@
package eu.dnetlib.dhp.collection; package eu.dnetlib.dhp.collection;
import static eu.dnetlib.dhp.utils.DHPUtils.*;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Objects; import java.util.Objects;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonIgnore; import eu.dnetlib.dhp.message.MessageSender;
public class CollectorPluginReport extends LinkedHashMap<String, String> implements Closeable { public class CollectorPluginReport extends LinkedHashMap<String, String> implements Closeable {
private static final Logger log = LoggerFactory.getLogger(CollectorPluginReport.class); private static final Logger log = LoggerFactory.getLogger(CollectorPluginReport.class);
@JsonIgnore private MessageSender messageSender;
private Path path;
@JsonIgnore
private FSDataOutputStream fos;
public static String SUCCESS = "success";
public CollectorPluginReport() { public CollectorPluginReport() {
} }
public CollectorPluginReport(FileSystem fs, Path path) throws IOException { public CollectorPluginReport(MessageSender messageSender) throws IOException {
this.path = path; this.messageSender = messageSender;
this.fos = fs.create(path);
} }
public Boolean isSuccess() { public void ongoing(Long current, Long total) {
return containsKey(SUCCESS) && Boolean.valueOf(get(SUCCESS)); messageSender.sendMessage(current, total);
}
public void setSuccess(Boolean success) {
put(SUCCESS, String.valueOf(success));
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
final String data = MAPPER.writeValueAsString(this); if (Objects.nonNull(messageSender)) {
if (Objects.nonNull(fos)) { log.info("closing report: ");
log.info("writing report {} to {}", data, path.toString()); this.forEach((k, v) -> log.info("{} - {}", k, v));
IOUtils.write(data, fos); messageSender.sendReport(this);
populateOOZIEEnv(this);
} }
} }
} }

View File

@ -38,20 +38,16 @@ public class CollectorWorker {
private final CollectorPluginReport report; private final CollectorPluginReport report;
private final MessageSender messageSender;
public CollectorWorker( public CollectorWorker(
final ApiDescriptor api, final ApiDescriptor api,
final FileSystem fileSystem, final FileSystem fileSystem,
final MDStoreVersion mdStoreVersion, final MDStoreVersion mdStoreVersion,
final HttpClientParams clientParams, final HttpClientParams clientParams,
final MessageSender messageSender,
final CollectorPluginReport report) { final CollectorPluginReport report) {
this.api = api; this.api = api;
this.fileSystem = fileSystem; this.fileSystem = fileSystem;
this.mdStoreVersion = mdStoreVersion; this.mdStoreVersion = mdStoreVersion;
this.clientParams = clientParams; this.clientParams = clientParams;
this.messageSender = messageSender;
this.report = report; this.report = report;
} }
@ -78,23 +74,21 @@ public class CollectorWorker {
content -> { content -> {
key.set(counter.getAndIncrement()); key.set(counter.getAndIncrement());
if (counter.get() % 500 == 0) if (counter.get() % 500 == 0)
messageSender.sendMessage(counter.longValue(), null); report.ongoing(counter.longValue(), null);
value.set(content); value.set(content);
try { try {
writer.append(key, value); writer.append(key, value);
} catch (Throwable e) { } catch (Throwable e) {
report.put(e.getClass().getName(), e.getMessage()); report.put(e.getClass().getName(), e.getMessage());
log.warn("setting report to failed"); log.warn("setting report to failed");
report.setSuccess(false);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
} catch (Throwable e) { } catch (Throwable e) {
report.put(e.getClass().getName(), e.getMessage()); report.put(e.getClass().getName(), e.getMessage());
log.warn("setting report to failed"); log.warn("setting report to failed");
report.setSuccess(false);
} finally { } finally {
messageSender.sendMessage(counter.longValue(), counter.longValue()); report.ongoing(counter.longValue(), counter.longValue());
} }
} }

View File

@ -77,22 +77,15 @@ public class CollectorWorkerApplication {
} }
protected void run(String mdStoreVersion, HttpClientParams clientParams, ApiDescriptor api, protected void run(String mdStoreVersion, HttpClientParams clientParams, ApiDescriptor api,
String dnetMessageManagerURL, String workflowId) throws IOException { String dnetMessageManagerURL, String workflowId)
throws IOException, CollectorException, UnknownCollectorPluginException {
final MessageSender ms = new MessageSender(dnetMessageManagerURL, workflowId); final MessageSender ms = new MessageSender(dnetMessageManagerURL, workflowId);
final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class); final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class);
final String reportPath = currentVersion.getHdfsPath() + REPORT_FILE_NAME; try (CollectorPluginReport report = new CollectorPluginReport(ms)) {
log.info("report path is {}", reportPath); new CollectorWorker(api, fileSystem, currentVersion, clientParams, report).collect();
try (CollectorPluginReport report = new CollectorPluginReport(fileSystem, new Path(reportPath))) {
final CollectorWorker worker = new CollectorWorker(api, fileSystem, currentVersion, clientParams, ms,
report);
worker.collect();
report.setSuccess(true);
} catch (Throwable e) {
log.info("got exception {}, ignoring", e.getMessage());
} }
} }

View File

@ -1,62 +0,0 @@
package eu.dnetlib.dhp.collection;
import static eu.dnetlib.dhp.common.Constants.REPORT_FILE_NAME;
import static eu.dnetlib.dhp.utils.DHPUtils.*;
import java.io.IOException;
import java.util.Objects;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
/**
* CollectorWorkerReporter
*/
public class CollectorWorkerReporter {
private static final Logger log = LoggerFactory.getLogger(CollectorWorkerReporter.class);
/**
* @param args
*/
public static void main(final String[] args) throws IOException, ParseException, CollectorException {
final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser(
IOUtils
.toString(
CollectorWorker.class
.getResourceAsStream(
"/eu/dnetlib/dhp/collection/collector_reporter_input_parameter.json")));
argumentParser.parseArgument(args);
final String nameNode = argumentParser.get("namenode");
log.info("nameNode is {}", nameNode);
final String mdStoreVersion = argumentParser.get("mdStoreVersion");
log.info("mdStoreVersion is {}", mdStoreVersion);
final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class);
final String reportPath = currentVersion.getHdfsPath() + REPORT_FILE_NAME;
log.info("report path is {}", reportPath);
final Configuration conf = getHadoopConfiguration(nameNode);
CollectorPluginReport report = readHdfsFileAs(conf, reportPath, CollectorPluginReport.class);
if (Objects.isNull(report)) {
throw new CollectorException("collection report is NULL");
}
log.info("report success: {}, size: {}", report.isSuccess(), report.size());
report.forEach((k, v) -> log.info("{} - {}", k, v));
if (!report.isSuccess()) {
throw new CollectorException("collection report indicates a failure");
}
}
}

View File

@ -1,14 +0,0 @@
[
{
"paramName": "n",
"paramLongName": "namenode",
"paramDescription": "the Name Node URI",
"paramRequired": true
},
{
"paramName": "mv",
"paramLongName": "mdStoreVersion",
"paramDescription": "the MDStore Version bean",
"paramRequired": true
}
]

View File

@ -110,18 +110,6 @@
<arg>--retryDelay</arg><arg>${retryDelay}</arg> <arg>--retryDelay</arg><arg>${retryDelay}</arg>
<arg>--connectTimeOut</arg><arg>${connectTimeOut}</arg> <arg>--connectTimeOut</arg><arg>${connectTimeOut}</arg>
<arg>--readTimeOut</arg><arg>${readTimeOut}</arg> <arg>--readTimeOut</arg><arg>${readTimeOut}</arg>
<capture-output/>
</java>
<ok to="CollectorReport"/>
<error to="CollectorReport"/>
</action>
<action name="CollectorReport">
<java>
<main-class>eu.dnetlib.dhp.collection.CollectorWorkerReporter</main-class>
<java-opts>${collection_java_xmx}</java-opts>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
</java> </java>
<ok to="GenerateNativeStoreSparkJob"/> <ok to="GenerateNativeStoreSparkJob"/>
<error to="FailCollection"/> <error to="FailCollection"/>

View File

@ -1,30 +0,0 @@
package eu.dnetlib.dhp.collector.worker.utils;
import static eu.dnetlib.dhp.utils.DHPUtils.*;
import java.io.IOException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.collection.CollectorPluginReport;
public class CollectorPluginReportTest {
@Test
public void testSerialize() throws IOException {
CollectorPluginReport r1 = new CollectorPluginReport();
r1.put("a", "b");
r1.setSuccess(true);
String s = MAPPER.writeValueAsString(r1);
Assertions.assertNotNull(s);
CollectorPluginReport r2 = MAPPER.readValue(s, CollectorPluginReport.class);
Assertions.assertTrue(r2.isSuccess(), "should be true");
}
}