2021-01-28 09:51:17 +01:00
|
|
|
|
2021-02-12 12:31:02 +01:00
|
|
|
package eu.dnetlib.dhp.collection;
|
2021-01-28 09:51:17 +01:00
|
|
|
|
2021-02-08 12:19:38 +01:00
|
|
|
import static eu.dnetlib.dhp.common.Constants.SEQUENCE_FILE_NAME;
|
2021-02-05 19:18:05 +01:00
|
|
|
|
2021-01-28 09:51:17 +01:00
|
|
|
import java.io.IOException;
|
2021-02-12 12:31:02 +01:00
|
|
|
import java.util.Optional;
|
2021-01-28 09:51:17 +01:00
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
2021-02-12 12:31:02 +01:00
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
2021-01-28 09:51:17 +01:00
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
|
import org.apache.hadoop.io.IntWritable;
|
|
|
|
import org.apache.hadoop.io.SequenceFile;
|
|
|
|
import org.apache.hadoop.io.Text;
|
2021-02-12 12:31:02 +01:00
|
|
|
import org.apache.hadoop.io.compress.DeflateCodec;
|
2021-01-28 09:51:17 +01:00
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
2021-02-17 10:28:01 +01:00
|
|
|
import eu.dnetlib.dhp.aggregation.common.ReporterCallback;
|
|
|
|
import eu.dnetlib.dhp.aggregation.common.ReportingJob;
|
2021-01-28 09:51:17 +01:00
|
|
|
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
|
2021-02-24 15:07:24 +01:00
|
|
|
import eu.dnetlib.dhp.collection.plugin.mongodb.MDStoreCollectorPlugin;
|
2021-02-12 12:31:02 +01:00
|
|
|
import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin;
|
|
|
|
import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin;
|
2021-03-08 09:44:09 +01:00
|
|
|
import eu.dnetlib.dhp.collection.plugin.rest.RestCollectorPlugin;
|
2021-08-12 18:20:56 +02:00
|
|
|
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
|
|
|
import eu.dnetlib.dhp.common.collection.CollectorException;
|
|
|
|
import eu.dnetlib.dhp.common.collection.HttpClientParams;
|
2021-05-10 14:32:05 +02:00
|
|
|
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
|
2021-01-28 09:51:17 +01:00
|
|
|
|
2021-02-17 10:28:01 +01:00
|
|
|
public class CollectorWorker extends ReportingJob {
|
2021-01-28 09:51:17 +01:00
|
|
|
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(CollectorWorker.class);
|
|
|
|
|
|
|
|
private final ApiDescriptor api;
|
|
|
|
|
2021-02-12 12:31:02 +01:00
|
|
|
private final FileSystem fileSystem;
|
2021-01-28 09:51:17 +01:00
|
|
|
|
2021-02-05 19:18:05 +01:00
|
|
|
private final MDStoreVersion mdStoreVersion;
|
2021-01-28 09:51:17 +01:00
|
|
|
|
2021-02-05 19:18:05 +01:00
|
|
|
private final HttpClientParams clientParams;
|
|
|
|
|
2021-01-28 09:51:17 +01:00
|
|
|
public CollectorWorker(
|
|
|
|
final ApiDescriptor api,
|
2021-02-12 12:31:02 +01:00
|
|
|
final FileSystem fileSystem,
|
2021-02-05 19:18:05 +01:00
|
|
|
final MDStoreVersion mdStoreVersion,
|
|
|
|
final HttpClientParams clientParams,
|
2021-02-17 10:28:01 +01:00
|
|
|
final AggregatorReport report) {
|
|
|
|
super(report);
|
2021-01-28 09:51:17 +01:00
|
|
|
this.api = api;
|
2021-02-12 12:31:02 +01:00
|
|
|
this.fileSystem = fileSystem;
|
2021-02-05 19:18:05 +01:00
|
|
|
this.mdStoreVersion = mdStoreVersion;
|
|
|
|
this.clientParams = clientParams;
|
2021-01-28 09:51:17 +01:00
|
|
|
}
|
|
|
|
|
2021-02-05 19:18:05 +01:00
|
|
|
public void collect() throws UnknownCollectorPluginException, CollectorException, IOException {
|
2021-02-03 12:33:41 +01:00
|
|
|
|
2021-02-05 19:18:05 +01:00
|
|
|
final String outputPath = mdStoreVersion.getHdfsPath() + SEQUENCE_FILE_NAME;
|
|
|
|
log.info("outputPath path is {}", outputPath);
|
2021-02-03 12:33:41 +01:00
|
|
|
|
2021-02-12 12:31:02 +01:00
|
|
|
final CollectorPlugin plugin = getCollectorPlugin();
|
2021-02-03 12:33:41 +01:00
|
|
|
final AtomicInteger counter = new AtomicInteger(0);
|
2021-02-04 15:51:15 +01:00
|
|
|
|
2021-02-17 10:28:01 +01:00
|
|
|
scheduleReport(counter);
|
2021-02-16 16:53:14 +01:00
|
|
|
|
2021-02-03 12:33:41 +01:00
|
|
|
try (SequenceFile.Writer writer = SequenceFile
|
|
|
|
.createWriter(
|
2021-02-12 12:31:02 +01:00
|
|
|
fileSystem.getConf(),
|
2021-02-05 19:18:05 +01:00
|
|
|
SequenceFile.Writer.file(new Path(outputPath)),
|
2021-02-03 12:33:41 +01:00
|
|
|
SequenceFile.Writer.keyClass(IntWritable.class),
|
2021-02-08 18:06:25 +01:00
|
|
|
SequenceFile.Writer.valueClass(Text.class),
|
2021-02-12 12:31:02 +01:00
|
|
|
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
|
2021-02-03 12:33:41 +01:00
|
|
|
final IntWritable key = new IntWritable(counter.get());
|
|
|
|
final Text value = new Text();
|
|
|
|
plugin
|
2021-02-05 19:18:05 +01:00
|
|
|
.collect(api, report)
|
2021-02-03 12:33:41 +01:00
|
|
|
.forEach(
|
|
|
|
content -> {
|
|
|
|
key.set(counter.getAndIncrement());
|
|
|
|
value.set(content);
|
|
|
|
try {
|
|
|
|
writer.append(key, value);
|
2021-02-05 19:18:05 +01:00
|
|
|
} catch (Throwable e) {
|
2021-02-03 12:33:41 +01:00
|
|
|
throw new RuntimeException(e);
|
|
|
|
}
|
|
|
|
});
|
2021-02-05 19:18:05 +01:00
|
|
|
} catch (Throwable e) {
|
|
|
|
report.put(e.getClass().getName(), e.getMessage());
|
2021-02-16 16:53:14 +01:00
|
|
|
throw new CollectorException(e);
|
2021-02-03 12:33:41 +01:00
|
|
|
} finally {
|
2021-02-17 10:28:01 +01:00
|
|
|
shutdown();
|
2021-02-15 15:08:59 +01:00
|
|
|
report.ongoing(counter.longValue(), counter.longValue());
|
2021-01-28 09:51:17 +01:00
|
|
|
}
|
|
|
|
}
|
2021-02-05 19:18:05 +01:00
|
|
|
|
2021-02-17 10:28:01 +01:00
|
|
|
private void scheduleReport(AtomicInteger counter) {
|
|
|
|
schedule(new ReporterCallback() {
|
|
|
|
@Override
|
|
|
|
public Long getCurrent() {
|
|
|
|
return counter.longValue();
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Long getTotal() {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2021-02-12 12:31:02 +01:00
|
|
|
private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException {
|
2021-02-17 10:28:01 +01:00
|
|
|
|
|
|
|
switch (CollectorPlugin.NAME.valueOf(api.getProtocol())) {
|
|
|
|
case oai:
|
2021-02-12 12:31:02 +01:00
|
|
|
return new OaiCollectorPlugin(clientParams);
|
2021-03-08 09:44:09 +01:00
|
|
|
case rest_json2xml:
|
|
|
|
return new RestCollectorPlugin(clientParams);
|
2021-02-17 10:28:01 +01:00
|
|
|
case other:
|
|
|
|
final CollectorPlugin.NAME.OTHER_NAME plugin = Optional
|
2021-02-12 12:31:02 +01:00
|
|
|
.ofNullable(api.getParams().get("other_plugin_type"))
|
2021-02-17 10:28:01 +01:00
|
|
|
.map(CollectorPlugin.NAME.OTHER_NAME::valueOf)
|
2021-08-11 12:13:22 +02:00
|
|
|
.orElseThrow(() -> new IllegalArgumentException("invalid other_plugin_type"));
|
2021-02-12 12:31:02 +01:00
|
|
|
|
|
|
|
switch (plugin) {
|
2021-02-17 10:28:01 +01:00
|
|
|
case mdstore_mongodb_dump:
|
2021-02-12 12:31:02 +01:00
|
|
|
return new MongoDbDumpCollectorPlugin(fileSystem);
|
2021-02-17 10:28:01 +01:00
|
|
|
case mdstore_mongodb:
|
2021-02-24 15:07:24 +01:00
|
|
|
return new MDStoreCollectorPlugin();
|
2021-02-12 12:31:02 +01:00
|
|
|
default:
|
2021-02-17 10:28:01 +01:00
|
|
|
throw new UnknownCollectorPluginException("plugin is not managed: " + plugin);
|
2021-02-12 12:31:02 +01:00
|
|
|
}
|
|
|
|
default:
|
2021-02-17 10:28:01 +01:00
|
|
|
throw new UnknownCollectorPluginException("protocol is not managed: " + api.getProtocol());
|
2021-02-12 12:31:02 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-01-28 09:51:17 +01:00
|
|
|
}
|