diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/message/Message.java b/dhp-common/src/main/java/eu/dnetlib/dhp/message/Message.java index ecccf8a43..f1107b4b8 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/message/Message.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/message/Message.java @@ -19,7 +19,8 @@ public class Message implements Serializable { private Map body; - public Message() {} + public Message() { + } public Message(final MessageType messageType, final String workflowId) { this(messageType, workflowId, new LinkedHashMap<>()); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorPluginReport.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorPluginReport.java index a10572d06..d8f167d49 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorPluginReport.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorPluginReport.java @@ -3,12 +3,17 @@ package eu.dnetlib.dhp.collection; import java.io.Closeable; import java.io.IOException; +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.Map; import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Joiner; +import com.google.gson.Gson; + import eu.dnetlib.dhp.message.MessageSender; public class CollectorPluginReport extends LinkedHashMap implements Closeable { @@ -33,7 +38,10 @@ public class CollectorPluginReport extends LinkedHashMap impleme if (Objects.nonNull(messageSender)) { log.info("closing report: "); this.forEach((k, v) -> log.info("{} - {}", k, v)); - messageSender.sendReport(this); + + Map m = new HashMap<>(); + m.put(getClass().getSimpleName().toLowerCase(), new Gson().toJson(values())); + messageSender.sendReport(m); } } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java index 04e0f70c4..154b50414 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java @@ -5,6 +5,8 @@ import static eu.dnetlib.dhp.common.Constants.SEQUENCE_FILE_NAME; import java.io.IOException; import java.util.Optional; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.StringUtils; @@ -22,11 +24,11 @@ import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin; -import eu.dnetlib.dhp.message.MessageSender; public class CollectorWorker { private static final Logger log = LoggerFactory.getLogger(CollectorWorker.class); + public static final int ONGOING_REPORT_FREQUENCY_MS = 5000; private final ApiDescriptor api; @@ -59,6 +61,14 @@ public class CollectorWorker { final CollectorPlugin plugin = getCollectorPlugin(); final AtomicInteger counter = new AtomicInteger(0); + final Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + report.ongoing(counter.longValue(), null); + } + }, 5000, ONGOING_REPORT_FREQUENCY_MS); + try (SequenceFile.Writer writer = SequenceFile .createWriter( fileSystem.getConf(), @@ -73,21 +83,18 @@ public class CollectorWorker { .forEach( content -> { key.set(counter.getAndIncrement()); - if (counter.get() % 500 == 0) - report.ongoing(counter.longValue(), null); value.set(content); try { writer.append(key, value); } catch (Throwable e) { - report.put(e.getClass().getName(), e.getMessage()); - log.warn("setting report to failed"); throw new RuntimeException(e); } }); } catch (Throwable e) { report.put(e.getClass().getName(), e.getMessage()); - log.warn("setting report to failed"); + throw new CollectorException(e); } finally { + timer.cancel(); report.ongoing(counter.longValue(), counter.longValue()); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java index edfcb7bb5..0a0a4c734 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java @@ -38,7 +38,7 @@ public class OaiIterator implements Iterator { private String token; private boolean started; private final HttpConnector2 httpConnector; - private CollectorPluginReport errorLogList; + private CollectorPluginReport report; public OaiIterator( final String baseUrl, @@ -47,7 +47,7 @@ public class OaiIterator implements Iterator { final String fromDate, final String untilDate, final HttpConnector2 httpConnector, - final CollectorPluginReport errorLogList) { + final CollectorPluginReport report) { this.baseUrl = baseUrl; this.mdFormat = mdFormat; this.set = set; @@ -55,7 +55,7 @@ public class OaiIterator implements Iterator { this.untilDate = untilDate; this.started = false; this.httpConnector = httpConnector; - this.errorLogList = errorLogList; + this.report = report; } private void verifyStarted() { @@ -113,7 +113,7 @@ public class OaiIterator implements Iterator { return downloadPage(url); } catch (final UnsupportedEncodingException e) { - errorLogList.put(e.getClass().getName(), e.getMessage()); + report.put(e.getClass().getName(), e.getMessage()); throw new CollectorException(e); } } @@ -139,27 +139,27 @@ public class OaiIterator implements Iterator { + "?verb=ListRecords&resumptionToken=" + URLEncoder.encode(resumptionToken, "UTF-8")); } catch (final UnsupportedEncodingException e) { - errorLogList.put(e.getClass().getName(), e.getMessage()); + report.put(e.getClass().getName(), e.getMessage()); throw new CollectorException(e); } } private String downloadPage(final String url) throws CollectorException { - final String xml = httpConnector.getInputSource(url, errorLogList); + final String xml = httpConnector.getInputSource(url, report); Document doc; try { doc = reader.read(new StringReader(xml)); } catch (final DocumentException e) { log.warn("Error parsing xml, I try to clean it. {}", e.getMessage()); - errorLogList.put(e.getClass().getName(), e.getMessage()); + report.put(e.getClass().getName(), e.getMessage()); final String cleaned = XmlCleaner.cleanAllEntities(xml); try { doc = reader.read(new StringReader(cleaned)); } catch (final DocumentException e1) { final String resumptionToken = extractResumptionToken(xml); if (resumptionToken == null) { - errorLogList.put(e1.getClass().getName(), e1.getMessage()); + report.put(e1.getClass().getName(), e1.getMessage()); throw new CollectorException("Error parsing cleaned document:\n" + cleaned, e1); } return resumptionToken; @@ -172,11 +172,11 @@ public class OaiIterator implements Iterator { if ("noRecordsMatch".equalsIgnoreCase(code)) { final String msg = "noRecordsMatch for oai call : " + url; log.warn(msg); - errorLogList.put(REPORT_PREFIX + code, msg); + report.put(REPORT_PREFIX + code, msg); return null; } else { final String msg = code + " - " + errorNode.getText(); - errorLogList.put(REPORT_PREFIX + "error", msg); + report.put(REPORT_PREFIX + "error", msg); throw new CollectorException(msg); } } @@ -188,7 +188,7 @@ public class OaiIterator implements Iterator { return doc.valueOf("//*[local-name()='resumptionToken']"); } - public CollectorPluginReport getErrorLogList() { - return errorLogList; + public CollectorPluginReport getReport() { + return report; } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java index d7b5de087..a72a62f13 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java @@ -18,9 +18,8 @@ public class OaiIteratorFactory { final String fromDate, final String untilDate, final HttpClientParams clientParams, - final CollectorPluginReport errorLogList) { - return new OaiIterator(baseUrl, mdFormat, set, fromDate, untilDate, getHttpConnector(clientParams), - errorLogList); + final CollectorPluginReport report) { + return new OaiIterator(baseUrl, mdFormat, set, fromDate, untilDate, getHttpConnector(clientParams), report); } private HttpConnector2 getHttpConnector(HttpClientParams clientParams) {