diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorPluginReport.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregatorReport.java similarity index 69% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorPluginReport.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregatorReport.java index d8f167d49..269f8f6e9 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorPluginReport.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregatorReport.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.collection; +package eu.dnetlib.dhp.aggregation.common; import java.io.Closeable; import java.io.IOException; @@ -11,21 +11,20 @@ import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Joiner; import com.google.gson.Gson; import eu.dnetlib.dhp.message.MessageSender; -public class CollectorPluginReport extends LinkedHashMap implements Closeable { +public class AggregatorReport extends LinkedHashMap implements Closeable { - private static final Logger log = LoggerFactory.getLogger(CollectorPluginReport.class); + private static final Logger log = LoggerFactory.getLogger(AggregatorReport.class); private MessageSender messageSender; - public CollectorPluginReport() { + public AggregatorReport() { } - public CollectorPluginReport(MessageSender messageSender) throws IOException { + public AggregatorReport(MessageSender messageSender) throws IOException { this.messageSender = messageSender; } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/ReporterCallback.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/ReporterCallback.java new file mode 100644 index 000000000..b289b6e07 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/ReporterCallback.java @@ -0,0 +1,10 @@ + +package eu.dnetlib.dhp.aggregation.common; + +public interface ReporterCallback { + + Long getCurrent(); + + Long getTotal(); + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/ReportingJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/ReportingJob.java new file mode 100644 index 000000000..791226034 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/ReportingJob.java @@ -0,0 +1,41 @@ + +package eu.dnetlib.dhp.aggregation.common; + +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public abstract class ReportingJob { + + /** + * Frequency (seconds) for sending ongoing messages to report the collection task advancement + */ + public static final int ONGOING_REPORT_FREQUENCY = 5; + + /** + * Initial delay (seconds) for sending ongoing messages to report the collection task advancement + */ + public static final int INITIAL_DELAY = 2; + + private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + + protected final AggregatorReport report; + + public ReportingJob(AggregatorReport report) { + this.report = report; + } + + protected void schedule(final ReporterCallback callback) { + executor.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + report.ongoing(callback.getCurrent(), callback.getTotal()); + } + }, INITIAL_DELAY, ONGOING_REPORT_FREQUENCY, TimeUnit.SECONDS); + } + + protected void shutdown() { + executor.shutdown(); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java index 154b50414..a397c4f9d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java @@ -5,11 +5,8 @@ import static eu.dnetlib.dhp.common.Constants.SEQUENCE_FILE_NAME; import java.io.IOException; import java.util.Optional; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; @@ -20,15 +17,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; +import eu.dnetlib.dhp.aggregation.common.AggregatorReport; +import eu.dnetlib.dhp.aggregation.common.ReporterCallback; +import eu.dnetlib.dhp.aggregation.common.ReportingJob; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin; -public class CollectorWorker { +public class CollectorWorker extends ReportingJob { private static final Logger log = LoggerFactory.getLogger(CollectorWorker.class); - public static final int ONGOING_REPORT_FREQUENCY_MS = 5000; private final ApiDescriptor api; @@ -38,19 +37,17 @@ public class CollectorWorker { private final HttpClientParams clientParams; - private final CollectorPluginReport report; - public CollectorWorker( final ApiDescriptor api, final FileSystem fileSystem, final MDStoreVersion mdStoreVersion, final HttpClientParams clientParams, - final CollectorPluginReport report) { + final AggregatorReport report) { + super(report); this.api = api; this.fileSystem = fileSystem; this.mdStoreVersion = mdStoreVersion; this.clientParams = clientParams; - this.report = report; } public void collect() throws UnknownCollectorPluginException, CollectorException, IOException { @@ -61,13 +58,7 @@ public class CollectorWorker { final CollectorPlugin plugin = getCollectorPlugin(); final AtomicInteger counter = new AtomicInteger(0); - final Timer timer = new Timer(); - timer.schedule(new TimerTask() { - @Override - public void run() { - report.ongoing(counter.longValue(), null); - } - }, 5000, ONGOING_REPORT_FREQUENCY_MS); + scheduleReport(counter); try (SequenceFile.Writer writer = SequenceFile .createWriter( @@ -94,30 +85,46 @@ public class CollectorWorker { report.put(e.getClass().getName(), e.getMessage()); throw new CollectorException(e); } finally { - timer.cancel(); + shutdown(); report.ongoing(counter.longValue(), counter.longValue()); } } + private void scheduleReport(AtomicInteger counter) { + schedule(new ReporterCallback() { + @Override + public Long getCurrent() { + return counter.longValue(); + } + + @Override + public Long getTotal() { + return null; + } + }); + } + private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException { - switch (StringUtils.lowerCase(StringUtils.trim(api.getProtocol()))) { - case "oai": + + switch (CollectorPlugin.NAME.valueOf(api.getProtocol())) { + case oai: return new OaiCollectorPlugin(clientParams); - case "other": - final String plugin = Optional + case other: + final CollectorPlugin.NAME.OTHER_NAME plugin = Optional .ofNullable(api.getParams().get("other_plugin_type")) - .orElseThrow(() -> new UnknownCollectorPluginException("other_plugin_type")); + .map(CollectorPlugin.NAME.OTHER_NAME::valueOf) + .get(); switch (plugin) { - case "mdstore_mongodb_dump": + case mdstore_mongodb_dump: return new MongoDbDumpCollectorPlugin(fileSystem); - case "mdstore_mongodb": + case mdstore_mongodb: return new MongoDbCollectorPlugin(); default: - throw new UnknownCollectorPluginException("Unknown plugin type: " + plugin); + throw new UnknownCollectorPluginException("plugin is not managed: " + plugin); } default: - throw new UnknownCollectorPluginException("Unknown protocol: " + api.getProtocol()); + throw new UnknownCollectorPluginException("protocol is not managed: " + api.getProtocol()); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerApplication.java index 15f3f20b5..2c5640499 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerApplication.java @@ -10,11 +10,11 @@ import java.util.Optional; import org.apache.commons.cli.ParseException; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; +import eu.dnetlib.dhp.aggregation.common.AggregatorReport; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.message.MessageSender; @@ -80,11 +80,10 @@ public class CollectorWorkerApplication { String dnetMessageManagerURL, String workflowId) throws IOException, CollectorException, UnknownCollectorPluginException { + final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class); final MessageSender ms = new MessageSender(dnetMessageManagerURL, workflowId); - final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class); - - try (CollectorPluginReport report = new CollectorPluginReport(ms)) { + try (AggregatorReport report = new AggregatorReport(ms)) { new CollectorWorker(api, fileSystem, currentVersion, clientParams, report).collect(); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/HttpConnector2.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/HttpConnector2.java index 72a2a70a2..ddf9efa36 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/HttpConnector2.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/HttpConnector2.java @@ -15,6 +15,8 @@ import org.apache.http.HttpHeaders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import eu.dnetlib.dhp.aggregation.common.AggregatorReport; + /** * Migrated from https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-modular-collector-service/trunk/src/main/java/eu/dnetlib/data/collector/plugins/HttpConnector.java * @@ -42,17 +44,17 @@ public class HttpConnector2 { } /** - * @see HttpConnector2#getInputSource(java.lang.String, CollectorPluginReport) + * @see HttpConnector2#getInputSource(java.lang.String, AggregatorReport) */ public InputStream getInputSourceAsStream(final String requestUrl) throws CollectorException { return IOUtils.toInputStream(getInputSource(requestUrl)); } /** - * @see HttpConnector2#getInputSource(java.lang.String, CollectorPluginReport) + * @see HttpConnector2#getInputSource(java.lang.String, AggregatorReport) */ public String getInputSource(final String requestUrl) throws CollectorException { - return attemptDownloadAsString(requestUrl, 1, new CollectorPluginReport()); + return attemptDownloadAsString(requestUrl, 1, new AggregatorReport()); } /** @@ -63,13 +65,13 @@ public class HttpConnector2 { * @return the content of the downloaded resource * @throws CollectorException when retrying more than maxNumberOfRetry times */ - public String getInputSource(final String requestUrl, CollectorPluginReport report) + public String getInputSource(final String requestUrl, AggregatorReport report) throws CollectorException { return attemptDownloadAsString(requestUrl, 1, report); } private String attemptDownloadAsString(final String requestUrl, final int retryNumber, - final CollectorPluginReport report) throws CollectorException { + final AggregatorReport report) throws CollectorException { try (InputStream s = attemptDownload(requestUrl, retryNumber, report)) { return IOUtils.toString(s); @@ -80,7 +82,7 @@ public class HttpConnector2 { } private InputStream attemptDownload(final String requestUrl, final int retryNumber, - final CollectorPluginReport report) throws CollectorException, IOException { + final AggregatorReport report) throws CollectorException, IOException { if (retryNumber > getClientParams().getMaxNumberOfRetry()) { final String msg = String diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java index 0a4b3a892..0ed6be5fa 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java @@ -3,12 +3,21 @@ package eu.dnetlib.dhp.collection.plugin; import java.util.stream.Stream; +import eu.dnetlib.dhp.aggregation.common.AggregatorReport; import eu.dnetlib.dhp.collection.ApiDescriptor; import eu.dnetlib.dhp.collection.CollectorException; -import eu.dnetlib.dhp.collection.CollectorPluginReport; public interface CollectorPlugin { - Stream collect(ApiDescriptor api, CollectorPluginReport report) throws CollectorException; + enum NAME { + oai, other; + + public enum OTHER_NAME { + mdstore_mongodb_dump, mdstore_mongodb + } + + } + + Stream collect(ApiDescriptor api, AggregatorReport report) throws CollectorException; } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbCollectorPlugin.java index 7d1952f9c..89b92ffa1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbCollectorPlugin.java @@ -13,9 +13,9 @@ import com.mongodb.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; +import eu.dnetlib.dhp.aggregation.common.AggregatorReport; import eu.dnetlib.dhp.collection.ApiDescriptor; import eu.dnetlib.dhp.collection.CollectorException; -import eu.dnetlib.dhp.collection.CollectorPluginReport; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; public class MongoDbCollectorPlugin implements CollectorPlugin { @@ -26,7 +26,7 @@ public class MongoDbCollectorPlugin implements CollectorPlugin { public static final String MONGODB_DBNAME = "mongodb_dbname"; @Override - public Stream collect(ApiDescriptor api, CollectorPluginReport report) throws CollectorException { + public Stream collect(ApiDescriptor api, AggregatorReport report) throws CollectorException { final String host = Optional .ofNullable(api.getParams().get(MONGODB_HOST)) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbDumpCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbDumpCollectorPlugin.java index d08732593..3199af5b7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbDumpCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbDumpCollectorPlugin.java @@ -12,9 +12,9 @@ import java.util.zip.GZIPInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import eu.dnetlib.dhp.aggregation.common.AggregatorReport; import eu.dnetlib.dhp.collection.ApiDescriptor; import eu.dnetlib.dhp.collection.CollectorException; -import eu.dnetlib.dhp.collection.CollectorPluginReport; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; import eu.dnetlib.dhp.utils.DHPUtils; @@ -30,7 +30,7 @@ public class MongoDbDumpCollectorPlugin implements CollectorPlugin { } @Override - public Stream collect(ApiDescriptor api, CollectorPluginReport report) throws CollectorException { + public Stream collect(ApiDescriptor api, AggregatorReport report) throws CollectorException { final Path path = Optional .ofNullable(api.getParams().get("path")) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java index 8efdeb838..4600562ca 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java @@ -13,9 +13,9 @@ import com.google.common.base.Splitter; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import eu.dnetlib.dhp.aggregation.common.AggregatorReport; import eu.dnetlib.dhp.collection.ApiDescriptor; import eu.dnetlib.dhp.collection.CollectorException; -import eu.dnetlib.dhp.collection.CollectorPluginReport; import eu.dnetlib.dhp.collection.HttpClientParams; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; @@ -35,7 +35,7 @@ public class OaiCollectorPlugin implements CollectorPlugin { } @Override - public Stream collect(final ApiDescriptor api, final CollectorPluginReport report) + public Stream collect(final ApiDescriptor api, final AggregatorReport report) throws CollectorException { final String baseUrl = api.getBaseUrl(); final String mdFormat = api.getParams().get(FORMAT_PARAM); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java index 0a0a4c734..887027f21 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java @@ -16,8 +16,8 @@ import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import eu.dnetlib.dhp.aggregation.common.AggregatorReport; import eu.dnetlib.dhp.collection.CollectorException; -import eu.dnetlib.dhp.collection.CollectorPluginReport; import eu.dnetlib.dhp.collection.HttpConnector2; import eu.dnetlib.dhp.collection.XmlCleaner; @@ -38,7 +38,7 @@ public class OaiIterator implements Iterator { private String token; private boolean started; private final HttpConnector2 httpConnector; - private CollectorPluginReport report; + private AggregatorReport report; public OaiIterator( final String baseUrl, @@ -47,7 +47,7 @@ public class OaiIterator implements Iterator { final String fromDate, final String untilDate, final HttpConnector2 httpConnector, - final CollectorPluginReport report) { + final AggregatorReport report) { this.baseUrl = baseUrl; this.mdFormat = mdFormat; this.set = set; @@ -188,7 +188,7 @@ public class OaiIterator implements Iterator { return doc.valueOf("//*[local-name()='resumptionToken']"); } - public CollectorPluginReport getReport() { + public AggregatorReport getReport() { return report; } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java index a72a62f13..48f6a94c8 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIteratorFactory.java @@ -3,7 +3,7 @@ package eu.dnetlib.dhp.collection.plugin.oai; import java.util.Iterator; -import eu.dnetlib.dhp.collection.CollectorPluginReport; +import eu.dnetlib.dhp.aggregation.common.AggregatorReport; import eu.dnetlib.dhp.collection.HttpClientParams; import eu.dnetlib.dhp.collection.HttpConnector2; @@ -18,7 +18,7 @@ public class OaiIteratorFactory { final String fromDate, final String untilDate, final HttpClientParams clientParams, - final CollectorPluginReport report) { + final AggregatorReport report) { return new OaiIterator(baseUrl, mdFormat, set, fromDate, untilDate, getHttpConnector(clientParams), report); }