WIP: collectorWorker error reporting, added report messages

This commit is contained in:
Claudio Atzori 2021-02-16 16:53:14 +01:00
parent 58288a95b8
commit cf27905a71
5 changed files with 38 additions and 23 deletions

View File

@ -19,7 +19,8 @@ public class Message implements Serializable {
private Map<String, String> body; private Map<String, String> body;
public Message() {} public Message() {
}
public Message(final MessageType messageType, final String workflowId) { public Message(final MessageType messageType, final String workflowId) {
this(messageType, workflowId, new LinkedHashMap<>()); this(messageType, workflowId, new LinkedHashMap<>());

View File

@ -3,12 +3,17 @@ package eu.dnetlib.dhp.collection;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.gson.Gson;
import eu.dnetlib.dhp.message.MessageSender; import eu.dnetlib.dhp.message.MessageSender;
public class CollectorPluginReport extends LinkedHashMap<String, String> implements Closeable { public class CollectorPluginReport extends LinkedHashMap<String, String> implements Closeable {
@ -33,7 +38,10 @@ public class CollectorPluginReport extends LinkedHashMap<String, String> impleme
if (Objects.nonNull(messageSender)) { if (Objects.nonNull(messageSender)) {
log.info("closing report: "); log.info("closing report: ");
this.forEach((k, v) -> log.info("{} - {}", k, v)); this.forEach((k, v) -> log.info("{} - {}", k, v));
messageSender.sendReport(this);
Map<String, String> m = new HashMap<>();
m.put(getClass().getSimpleName().toLowerCase(), new Gson().toJson(values()));
messageSender.sendReport(m);
} }
} }
} }

View File

@ -5,6 +5,8 @@ import static eu.dnetlib.dhp.common.Constants.SEQUENCE_FILE_NAME;
import java.io.IOException; import java.io.IOException;
import java.util.Optional; import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -22,11 +24,11 @@ import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin;
import eu.dnetlib.dhp.message.MessageSender;
public class CollectorWorker { public class CollectorWorker {
private static final Logger log = LoggerFactory.getLogger(CollectorWorker.class); private static final Logger log = LoggerFactory.getLogger(CollectorWorker.class);
public static final int ONGOING_REPORT_FREQUENCY_MS = 5000;
private final ApiDescriptor api; private final ApiDescriptor api;
@ -59,6 +61,14 @@ public class CollectorWorker {
final CollectorPlugin plugin = getCollectorPlugin(); final CollectorPlugin plugin = getCollectorPlugin();
final AtomicInteger counter = new AtomicInteger(0); final AtomicInteger counter = new AtomicInteger(0);
final Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
report.ongoing(counter.longValue(), null);
}
}, 5000, ONGOING_REPORT_FREQUENCY_MS);
try (SequenceFile.Writer writer = SequenceFile try (SequenceFile.Writer writer = SequenceFile
.createWriter( .createWriter(
fileSystem.getConf(), fileSystem.getConf(),
@ -73,21 +83,18 @@ public class CollectorWorker {
.forEach( .forEach(
content -> { content -> {
key.set(counter.getAndIncrement()); key.set(counter.getAndIncrement());
if (counter.get() % 500 == 0)
report.ongoing(counter.longValue(), null);
value.set(content); value.set(content);
try { try {
writer.append(key, value); writer.append(key, value);
} catch (Throwable e) { } catch (Throwable e) {
report.put(e.getClass().getName(), e.getMessage());
log.warn("setting report to failed");
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
} catch (Throwable e) { } catch (Throwable e) {
report.put(e.getClass().getName(), e.getMessage()); report.put(e.getClass().getName(), e.getMessage());
log.warn("setting report to failed"); throw new CollectorException(e);
} finally { } finally {
timer.cancel();
report.ongoing(counter.longValue(), counter.longValue()); report.ongoing(counter.longValue(), counter.longValue());
} }
} }

View File

@ -38,7 +38,7 @@ public class OaiIterator implements Iterator<String> {
private String token; private String token;
private boolean started; private boolean started;
private final HttpConnector2 httpConnector; private final HttpConnector2 httpConnector;
private CollectorPluginReport errorLogList; private CollectorPluginReport report;
public OaiIterator( public OaiIterator(
final String baseUrl, final String baseUrl,
@ -47,7 +47,7 @@ public class OaiIterator implements Iterator<String> {
final String fromDate, final String fromDate,
final String untilDate, final String untilDate,
final HttpConnector2 httpConnector, final HttpConnector2 httpConnector,
final CollectorPluginReport errorLogList) { final CollectorPluginReport report) {
this.baseUrl = baseUrl; this.baseUrl = baseUrl;
this.mdFormat = mdFormat; this.mdFormat = mdFormat;
this.set = set; this.set = set;
@ -55,7 +55,7 @@ public class OaiIterator implements Iterator<String> {
this.untilDate = untilDate; this.untilDate = untilDate;
this.started = false; this.started = false;
this.httpConnector = httpConnector; this.httpConnector = httpConnector;
this.errorLogList = errorLogList; this.report = report;
} }
private void verifyStarted() { private void verifyStarted() {
@ -113,7 +113,7 @@ public class OaiIterator implements Iterator<String> {
return downloadPage(url); return downloadPage(url);
} catch (final UnsupportedEncodingException e) { } catch (final UnsupportedEncodingException e) {
errorLogList.put(e.getClass().getName(), e.getMessage()); report.put(e.getClass().getName(), e.getMessage());
throw new CollectorException(e); throw new CollectorException(e);
} }
} }
@ -139,27 +139,27 @@ public class OaiIterator implements Iterator<String> {
+ "?verb=ListRecords&resumptionToken=" + "?verb=ListRecords&resumptionToken="
+ URLEncoder.encode(resumptionToken, "UTF-8")); + URLEncoder.encode(resumptionToken, "UTF-8"));
} catch (final UnsupportedEncodingException e) { } catch (final UnsupportedEncodingException e) {
errorLogList.put(e.getClass().getName(), e.getMessage()); report.put(e.getClass().getName(), e.getMessage());
throw new CollectorException(e); throw new CollectorException(e);
} }
} }
private String downloadPage(final String url) throws CollectorException { private String downloadPage(final String url) throws CollectorException {
final String xml = httpConnector.getInputSource(url, errorLogList); final String xml = httpConnector.getInputSource(url, report);
Document doc; Document doc;
try { try {
doc = reader.read(new StringReader(xml)); doc = reader.read(new StringReader(xml));
} catch (final DocumentException e) { } catch (final DocumentException e) {
log.warn("Error parsing xml, I try to clean it. {}", e.getMessage()); log.warn("Error parsing xml, I try to clean it. {}", e.getMessage());
errorLogList.put(e.getClass().getName(), e.getMessage()); report.put(e.getClass().getName(), e.getMessage());
final String cleaned = XmlCleaner.cleanAllEntities(xml); final String cleaned = XmlCleaner.cleanAllEntities(xml);
try { try {
doc = reader.read(new StringReader(cleaned)); doc = reader.read(new StringReader(cleaned));
} catch (final DocumentException e1) { } catch (final DocumentException e1) {
final String resumptionToken = extractResumptionToken(xml); final String resumptionToken = extractResumptionToken(xml);
if (resumptionToken == null) { if (resumptionToken == null) {
errorLogList.put(e1.getClass().getName(), e1.getMessage()); report.put(e1.getClass().getName(), e1.getMessage());
throw new CollectorException("Error parsing cleaned document:\n" + cleaned, e1); throw new CollectorException("Error parsing cleaned document:\n" + cleaned, e1);
} }
return resumptionToken; return resumptionToken;
@ -172,11 +172,11 @@ public class OaiIterator implements Iterator<String> {
if ("noRecordsMatch".equalsIgnoreCase(code)) { if ("noRecordsMatch".equalsIgnoreCase(code)) {
final String msg = "noRecordsMatch for oai call : " + url; final String msg = "noRecordsMatch for oai call : " + url;
log.warn(msg); log.warn(msg);
errorLogList.put(REPORT_PREFIX + code, msg); report.put(REPORT_PREFIX + code, msg);
return null; return null;
} else { } else {
final String msg = code + " - " + errorNode.getText(); final String msg = code + " - " + errorNode.getText();
errorLogList.put(REPORT_PREFIX + "error", msg); report.put(REPORT_PREFIX + "error", msg);
throw new CollectorException(msg); throw new CollectorException(msg);
} }
} }
@ -188,7 +188,7 @@ public class OaiIterator implements Iterator<String> {
return doc.valueOf("//*[local-name()='resumptionToken']"); return doc.valueOf("//*[local-name()='resumptionToken']");
} }
public CollectorPluginReport getErrorLogList() { public CollectorPluginReport getReport() {
return errorLogList; return report;
} }
} }

View File

@ -18,9 +18,8 @@ public class OaiIteratorFactory {
final String fromDate, final String fromDate,
final String untilDate, final String untilDate,
final HttpClientParams clientParams, final HttpClientParams clientParams,
final CollectorPluginReport errorLogList) { final CollectorPluginReport report) {
return new OaiIterator(baseUrl, mdFormat, set, fromDate, untilDate, getHttpConnector(clientParams), return new OaiIterator(baseUrl, mdFormat, set, fromDate, untilDate, getHttpConnector(clientParams), report);
errorLogList);
} }
private HttpConnector2 getHttpConnector(HttpClientParams clientParams) { private HttpConnector2 getHttpConnector(HttpClientParams clientParams) {