dnet-hadoop/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java

101 lines
3.2 KiB
Java

package eu.dnetlib.dhp.collection.worker;
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.SEQUENCE_FILE_NAME;
import static eu.dnetlib.dhp.application.ApplicationUtils.*;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginReport;
import eu.dnetlib.dhp.collection.worker.utils.HttpClientParams;
import eu.dnetlib.dhp.collection.worker.utils.UnknownCollectorPluginException;
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.message.MessageSender;
public class CollectorWorker {
private static final Logger log = LoggerFactory.getLogger(CollectorWorker.class);
private final ApiDescriptor api;
private final Configuration conf;
private final MDStoreVersion mdStoreVersion;
private final HttpClientParams clientParams;
private final CollectorPluginReport report;
private final MessageSender messageSender;
public CollectorWorker(
final ApiDescriptor api,
final Configuration conf,
final MDStoreVersion mdStoreVersion,
final HttpClientParams clientParams,
final MessageSender messageSender,
final CollectorPluginReport report) {
this.api = api;
this.conf = conf;
this.mdStoreVersion = mdStoreVersion;
this.clientParams = clientParams;
this.messageSender = messageSender;
this.report = report;
}
public void collect() throws UnknownCollectorPluginException, CollectorException, IOException {
final String outputPath = mdStoreVersion.getHdfsPath() + SEQUENCE_FILE_NAME;
log.info("outputPath path is {}", outputPath);
final CollectorPlugin plugin = CollectorPluginFactory.getPluginByProtocol(clientParams, api.getProtocol());
final AtomicInteger counter = new AtomicInteger(0);
try (SequenceFile.Writer writer = SequenceFile
.createWriter(
conf,
SequenceFile.Writer.file(new Path(outputPath)),
SequenceFile.Writer.keyClass(IntWritable.class),
SequenceFile.Writer.valueClass(Text.class))) {
final IntWritable key = new IntWritable(counter.get());
final Text value = new Text();
plugin
.collect(api, report)
.forEach(
content -> {
key.set(counter.getAndIncrement());
if (counter.get() % 500 == 0)
messageSender.sendMessage(counter.longValue(), null);
value.set(content);
try {
writer.append(key, value);
} catch (Throwable e) {
report.put(e.getClass().getName(), e.getMessage());
log.warn("setting report to failed");
report.setSuccess(false);
throw new RuntimeException(e);
}
});
} catch (Throwable e) {
report.put(e.getClass().getName(), e.getMessage());
log.warn("setting report to failed");
report.setSuccess(false);
} finally {
messageSender.sendMessage(counter.longValue(), counter.longValue());
}
}
}