WIP: collectorWorker error reporting, generalised reported implementation

This commit is contained in:
Claudio Atzori 2021-02-17 10:28:01 +01:00
parent cf27905a71
commit b592d78bb4
12 changed files with 123 additions and 56 deletions

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.collection;
package eu.dnetlib.dhp.aggregation.common;
import java.io.Closeable;
import java.io.IOException;
@ -11,21 +11,20 @@ import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.gson.Gson;
import eu.dnetlib.dhp.message.MessageSender;
public class CollectorPluginReport extends LinkedHashMap<String, String> implements Closeable {
public class AggregatorReport extends LinkedHashMap<String, String> implements Closeable {
private static final Logger log = LoggerFactory.getLogger(CollectorPluginReport.class);
private static final Logger log = LoggerFactory.getLogger(AggregatorReport.class);
private MessageSender messageSender;
public CollectorPluginReport() {
public AggregatorReport() {
}
public CollectorPluginReport(MessageSender messageSender) throws IOException {
public AggregatorReport(MessageSender messageSender) throws IOException {
this.messageSender = messageSender;
}

View File

@ -0,0 +1,10 @@
package eu.dnetlib.dhp.aggregation.common;
public interface ReporterCallback {
Long getCurrent();
Long getTotal();
}

View File

@ -0,0 +1,41 @@
package eu.dnetlib.dhp.aggregation.common;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public abstract class ReportingJob {
/**
* Frequency (seconds) for sending ongoing messages to report the collection task advancement
*/
public static final int ONGOING_REPORT_FREQUENCY = 5;
/**
* Initial delay (seconds) for sending ongoing messages to report the collection task advancement
*/
public static final int INITIAL_DELAY = 2;
private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
protected final AggregatorReport report;
public ReportingJob(AggregatorReport report) {
this.report = report;
}
protected void schedule(final ReporterCallback callback) {
executor.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
report.ongoing(callback.getCurrent(), callback.getTotal());
}
}, INITIAL_DELAY, ONGOING_REPORT_FREQUENCY, TimeUnit.SECONDS);
}
protected void shutdown() {
executor.shutdown();
}
}

View File

@ -5,11 +5,8 @@ import static eu.dnetlib.dhp.common.Constants.SEQUENCE_FILE_NAME;
import java.io.IOException;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
@ -20,15 +17,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.aggregation.common.AggregatorReport;
import eu.dnetlib.dhp.aggregation.common.ReporterCallback;
import eu.dnetlib.dhp.aggregation.common.ReportingJob;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin;
public class CollectorWorker {
public class CollectorWorker extends ReportingJob {
private static final Logger log = LoggerFactory.getLogger(CollectorWorker.class);
public static final int ONGOING_REPORT_FREQUENCY_MS = 5000;
private final ApiDescriptor api;
@ -38,19 +37,17 @@ public class CollectorWorker {
private final HttpClientParams clientParams;
private final CollectorPluginReport report;
public CollectorWorker(
final ApiDescriptor api,
final FileSystem fileSystem,
final MDStoreVersion mdStoreVersion,
final HttpClientParams clientParams,
final CollectorPluginReport report) {
final AggregatorReport report) {
super(report);
this.api = api;
this.fileSystem = fileSystem;
this.mdStoreVersion = mdStoreVersion;
this.clientParams = clientParams;
this.report = report;
}
public void collect() throws UnknownCollectorPluginException, CollectorException, IOException {
@ -61,13 +58,7 @@ public class CollectorWorker {
final CollectorPlugin plugin = getCollectorPlugin();
final AtomicInteger counter = new AtomicInteger(0);
final Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
report.ongoing(counter.longValue(), null);
}
}, 5000, ONGOING_REPORT_FREQUENCY_MS);
scheduleReport(counter);
try (SequenceFile.Writer writer = SequenceFile
.createWriter(
@ -94,30 +85,46 @@ public class CollectorWorker {
report.put(e.getClass().getName(), e.getMessage());
throw new CollectorException(e);
} finally {
timer.cancel();
shutdown();
report.ongoing(counter.longValue(), counter.longValue());
}
}
private void scheduleReport(AtomicInteger counter) {
schedule(new ReporterCallback() {
@Override
public Long getCurrent() {
return counter.longValue();
}
@Override
public Long getTotal() {
return null;
}
});
}
private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException {
switch (StringUtils.lowerCase(StringUtils.trim(api.getProtocol()))) {
case "oai":
switch (CollectorPlugin.NAME.valueOf(api.getProtocol())) {
case oai:
return new OaiCollectorPlugin(clientParams);
case "other":
final String plugin = Optional
case other:
final CollectorPlugin.NAME.OTHER_NAME plugin = Optional
.ofNullable(api.getParams().get("other_plugin_type"))
.orElseThrow(() -> new UnknownCollectorPluginException("other_plugin_type"));
.map(CollectorPlugin.NAME.OTHER_NAME::valueOf)
.get();
switch (plugin) {
case "mdstore_mongodb_dump":
case mdstore_mongodb_dump:
return new MongoDbDumpCollectorPlugin(fileSystem);
case "mdstore_mongodb":
case mdstore_mongodb:
return new MongoDbCollectorPlugin();
default:
throw new UnknownCollectorPluginException("Unknown plugin type: " + plugin);
throw new UnknownCollectorPluginException("plugin is not managed: " + plugin);
}
default:
throw new UnknownCollectorPluginException("Unknown protocol: " + api.getProtocol());
throw new UnknownCollectorPluginException("protocol is not managed: " + api.getProtocol());
}
}

View File

@ -10,11 +10,11 @@ import java.util.Optional;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.aggregation.common.AggregatorReport;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.message.MessageSender;
@ -80,11 +80,10 @@ public class CollectorWorkerApplication {
String dnetMessageManagerURL, String workflowId)
throws IOException, CollectorException, UnknownCollectorPluginException {
final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class);
final MessageSender ms = new MessageSender(dnetMessageManagerURL, workflowId);
final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class);
try (CollectorPluginReport report = new CollectorPluginReport(ms)) {
try (AggregatorReport report = new AggregatorReport(ms)) {
new CollectorWorker(api, fileSystem, currentVersion, clientParams, report).collect();
}
}

View File

@ -15,6 +15,8 @@ import org.apache.http.HttpHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.aggregation.common.AggregatorReport;
/**
* Migrated from https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-modular-collector-service/trunk/src/main/java/eu/dnetlib/data/collector/plugins/HttpConnector.java
*
@ -42,17 +44,17 @@ public class HttpConnector2 {
}
/**
* @see HttpConnector2#getInputSource(java.lang.String, CollectorPluginReport)
* @see HttpConnector2#getInputSource(java.lang.String, AggregatorReport)
*/
public InputStream getInputSourceAsStream(final String requestUrl) throws CollectorException {
return IOUtils.toInputStream(getInputSource(requestUrl));
}
/**
* @see HttpConnector2#getInputSource(java.lang.String, CollectorPluginReport)
* @see HttpConnector2#getInputSource(java.lang.String, AggregatorReport)
*/
public String getInputSource(final String requestUrl) throws CollectorException {
return attemptDownloadAsString(requestUrl, 1, new CollectorPluginReport());
return attemptDownloadAsString(requestUrl, 1, new AggregatorReport());
}
/**
@ -63,13 +65,13 @@ public class HttpConnector2 {
* @return the content of the downloaded resource
* @throws CollectorException when retrying more than maxNumberOfRetry times
*/
public String getInputSource(final String requestUrl, CollectorPluginReport report)
public String getInputSource(final String requestUrl, AggregatorReport report)
throws CollectorException {
return attemptDownloadAsString(requestUrl, 1, report);
}
private String attemptDownloadAsString(final String requestUrl, final int retryNumber,
final CollectorPluginReport report) throws CollectorException {
final AggregatorReport report) throws CollectorException {
try (InputStream s = attemptDownload(requestUrl, retryNumber, report)) {
return IOUtils.toString(s);
@ -80,7 +82,7 @@ public class HttpConnector2 {
}
private InputStream attemptDownload(final String requestUrl, final int retryNumber,
final CollectorPluginReport report) throws CollectorException, IOException {
final AggregatorReport report) throws CollectorException, IOException {
if (retryNumber > getClientParams().getMaxNumberOfRetry()) {
final String msg = String

View File

@ -3,12 +3,21 @@ package eu.dnetlib.dhp.collection.plugin;
import java.util.stream.Stream;
import eu.dnetlib.dhp.aggregation.common.AggregatorReport;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.CollectorException;
import eu.dnetlib.dhp.collection.CollectorPluginReport;
public interface CollectorPlugin {
Stream<String> collect(ApiDescriptor api, CollectorPluginReport report) throws CollectorException;
enum NAME {
oai, other;
public enum OTHER_NAME {
mdstore_mongodb_dump, mdstore_mongodb
}
}
Stream<String> collect(ApiDescriptor api, AggregatorReport report) throws CollectorException;
}

View File

@ -13,9 +13,9 @@ import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import eu.dnetlib.dhp.aggregation.common.AggregatorReport;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.CollectorException;
import eu.dnetlib.dhp.collection.CollectorPluginReport;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
public class MongoDbCollectorPlugin implements CollectorPlugin {
@ -26,7 +26,7 @@ public class MongoDbCollectorPlugin implements CollectorPlugin {
public static final String MONGODB_DBNAME = "mongodb_dbname";
@Override
public Stream<String> collect(ApiDescriptor api, CollectorPluginReport report) throws CollectorException {
public Stream<String> collect(ApiDescriptor api, AggregatorReport report) throws CollectorException {
final String host = Optional
.ofNullable(api.getParams().get(MONGODB_HOST))

View File

@ -12,9 +12,9 @@ import java.util.zip.GZIPInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import eu.dnetlib.dhp.aggregation.common.AggregatorReport;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.CollectorException;
import eu.dnetlib.dhp.collection.CollectorPluginReport;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.utils.DHPUtils;
@ -30,7 +30,7 @@ public class MongoDbDumpCollectorPlugin implements CollectorPlugin {
}
@Override
public Stream<String> collect(ApiDescriptor api, CollectorPluginReport report) throws CollectorException {
public Stream<String> collect(ApiDescriptor api, AggregatorReport report) throws CollectorException {
final Path path = Optional
.ofNullable(api.getParams().get("path"))

View File

@ -13,9 +13,9 @@ import com.google.common.base.Splitter;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.aggregation.common.AggregatorReport;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.CollectorException;
import eu.dnetlib.dhp.collection.CollectorPluginReport;
import eu.dnetlib.dhp.collection.HttpClientParams;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
@ -35,7 +35,7 @@ public class OaiCollectorPlugin implements CollectorPlugin {
}
@Override
public Stream<String> collect(final ApiDescriptor api, final CollectorPluginReport report)
public Stream<String> collect(final ApiDescriptor api, final AggregatorReport report)
throws CollectorException {
final String baseUrl = api.getBaseUrl();
final String mdFormat = api.getParams().get(FORMAT_PARAM);

View File

@ -16,8 +16,8 @@ import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.aggregation.common.AggregatorReport;
import eu.dnetlib.dhp.collection.CollectorException;
import eu.dnetlib.dhp.collection.CollectorPluginReport;
import eu.dnetlib.dhp.collection.HttpConnector2;
import eu.dnetlib.dhp.collection.XmlCleaner;
@ -38,7 +38,7 @@ public class OaiIterator implements Iterator<String> {
private String token;
private boolean started;
private final HttpConnector2 httpConnector;
private CollectorPluginReport report;
private AggregatorReport report;
public OaiIterator(
final String baseUrl,
@ -47,7 +47,7 @@ public class OaiIterator implements Iterator<String> {
final String fromDate,
final String untilDate,
final HttpConnector2 httpConnector,
final CollectorPluginReport report) {
final AggregatorReport report) {
this.baseUrl = baseUrl;
this.mdFormat = mdFormat;
this.set = set;
@ -188,7 +188,7 @@ public class OaiIterator implements Iterator<String> {
return doc.valueOf("//*[local-name()='resumptionToken']");
}
public CollectorPluginReport getReport() {
public AggregatorReport getReport() {
return report;
}
}

View File

@ -3,7 +3,7 @@ package eu.dnetlib.dhp.collection.plugin.oai;
import java.util.Iterator;
import eu.dnetlib.dhp.collection.CollectorPluginReport;
import eu.dnetlib.dhp.aggregation.common.AggregatorReport;
import eu.dnetlib.dhp.collection.HttpClientParams;
import eu.dnetlib.dhp.collection.HttpConnector2;
@ -18,7 +18,7 @@ public class OaiIteratorFactory {
final String fromDate,
final String untilDate,
final HttpClientParams clientParams,
final CollectorPluginReport report) {
final AggregatorReport report) {
return new OaiIterator(baseUrl, mdFormat, set, fromDate, untilDate, getHttpConnector(clientParams), report);
}