2021-01-28 09:51:17 +01:00
|
|
|
|
|
|
|
package eu.dnetlib.dhp.collection.worker;
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.net.URI;
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
|
import org.apache.hadoop.io.IntWritable;
|
|
|
|
import org.apache.hadoop.io.SequenceFile;
|
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
|
2021-02-03 12:33:41 +01:00
|
|
|
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginErrorLogList;
|
2021-01-28 09:51:17 +01:00
|
|
|
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
|
2021-01-29 16:42:41 +01:00
|
|
|
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
|
2021-01-28 09:51:17 +01:00
|
|
|
|
|
|
|
public class CollectorWorker {
|
|
|
|
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(CollectorWorker.class);
|
|
|
|
|
|
|
|
private final ApiDescriptor api;
|
|
|
|
|
|
|
|
private final String hdfsuri;
|
|
|
|
|
|
|
|
private final String hdfsPath;
|
|
|
|
|
|
|
|
public CollectorWorker(
|
|
|
|
final ApiDescriptor api,
|
|
|
|
final String hdfsuri,
|
2021-02-04 14:06:02 +01:00
|
|
|
final String hdfsPath) {
|
2021-01-28 09:51:17 +01:00
|
|
|
this.api = api;
|
|
|
|
this.hdfsuri = hdfsuri;
|
|
|
|
this.hdfsPath = hdfsPath;
|
|
|
|
}
|
|
|
|
|
2021-02-03 12:33:41 +01:00
|
|
|
public CollectorPluginErrorLogList collect() throws IOException, CollectorException {
|
|
|
|
|
|
|
|
// ====== Init HDFS File System Object
|
|
|
|
Configuration conf = new Configuration();
|
|
|
|
// Set FileSystem URI
|
|
|
|
conf.set("fs.defaultFS", hdfsuri);
|
|
|
|
// Because of Maven
|
|
|
|
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
|
|
|
|
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
|
|
|
|
|
|
|
|
System.setProperty("hadoop.home.dir", "/");
|
|
|
|
// Get the filesystem - HDFS
|
|
|
|
|
|
|
|
FileSystem.get(URI.create(hdfsuri), conf);
|
|
|
|
Path hdfswritepath = new Path(hdfsPath);
|
|
|
|
|
|
|
|
log.info("Created path " + hdfswritepath.toString());
|
|
|
|
|
2021-02-04 14:06:02 +01:00
|
|
|
final CollectorPlugin plugin = CollectorPluginFactory.getPluginByProtocol(api.getProtocol());
|
2021-02-03 12:33:41 +01:00
|
|
|
final AtomicInteger counter = new AtomicInteger(0);
|
|
|
|
try (SequenceFile.Writer writer = SequenceFile
|
|
|
|
.createWriter(
|
|
|
|
conf,
|
|
|
|
SequenceFile.Writer.file(hdfswritepath),
|
|
|
|
SequenceFile.Writer.keyClass(IntWritable.class),
|
|
|
|
SequenceFile.Writer.valueClass(Text.class))) {
|
|
|
|
final IntWritable key = new IntWritable(counter.get());
|
|
|
|
final Text value = new Text();
|
|
|
|
plugin
|
|
|
|
.collect(api)
|
|
|
|
.forEach(
|
|
|
|
content -> {
|
|
|
|
key.set(counter.getAndIncrement());
|
|
|
|
value.set(content);
|
|
|
|
try {
|
|
|
|
writer.append(key, value);
|
|
|
|
} catch (IOException e) {
|
|
|
|
throw new RuntimeException(e);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
} finally {
|
|
|
|
return plugin.getCollectionErrors();
|
2021-01-28 09:51:17 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|