preparation for the new plugins

This commit is contained in:
Michele Artini 2024-12-19 09:28:53 +01:00
parent ce22b1d536
commit 3777f3c15d
6 changed files with 199 additions and 50 deletions

View File

@ -20,6 +20,8 @@ import eu.dnetlib.dhp.aggregation.common.ReporterCallback;
import eu.dnetlib.dhp.aggregation.common.ReportingJob;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.base.BaseCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.csv.FileCsvCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.csv.HttpCsvCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.file.FileCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.file.FileGZipCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.gtr2.Gtr2PublicationsCollectorPlugin;
@ -47,11 +49,11 @@ public class CollectorWorker extends ReportingJob {
private final HttpClientParams clientParams;
public CollectorWorker(
final ApiDescriptor api,
final FileSystem fileSystem,
final MDStoreVersion mdStoreVersion,
final HttpClientParams clientParams,
final AggregatorReport report) {
final ApiDescriptor api,
final FileSystem fileSystem,
final MDStoreVersion mdStoreVersion,
final HttpClientParams clientParams,
final AggregatorReport report) {
super(report);
this.api = api;
this.fileSystem = fileSystem;
@ -70,25 +72,22 @@ public class CollectorWorker extends ReportingJob {
scheduleReport(counter);
try (SequenceFile.Writer writer = SequenceFile
.createWriter(
this.fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
.keyClass(IntWritable.class),
SequenceFile.Writer
.valueClass(Text.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
.createWriter(this.fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
.keyClass(IntWritable.class), SequenceFile.Writer
.valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
final IntWritable key = new IntWritable(counter.get());
final Text value = new Text();
plugin
.collect(this.api, this.report)
.forEach(content -> {
key.set(counter.getAndIncrement());
value.set(content);
try {
writer.append(key, value);
} catch (final Throwable e) {
throw new RuntimeException(e);
}
});
.collect(this.api, this.report)
.forEach(content -> {
key.set(counter.getAndIncrement());
value.set(content);
try {
writer.append(key, value);
} catch (final Throwable e) {
throw new RuntimeException(e);
}
});
} catch (final Throwable e) {
this.report.put(e.getClass().getName(), e.getMessage());
throw new CollectorException(e);
@ -116,38 +115,42 @@ public class CollectorWorker extends ReportingJob {
private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException {
switch (CollectorPlugin.NAME.valueOf(this.api.getProtocol())) {
case oai:
return new OaiCollectorPlugin(this.clientParams);
case rest_json2xml:
return new RestCollectorPlugin(this.clientParams);
case file:
return new FileCollectorPlugin(this.fileSystem);
case fileGzip:
return new FileGZipCollectorPlugin(this.fileSystem);
case baseDump:
return new BaseCollectorPlugin(this.fileSystem);
case gtr2Publications:
return new Gtr2PublicationsCollectorPlugin(this.clientParams);
case osfPreprints:
return new OsfPreprintsCollectorPlugin(this.clientParams);
case zenodoDump:
return new CollectZenodoDumpCollectorPlugin();
case other:
final CollectorPlugin.NAME.OTHER_NAME plugin = Optional
case oai:
return new OaiCollectorPlugin(this.clientParams);
case rest_json2xml:
return new RestCollectorPlugin(this.clientParams);
case file:
return new FileCollectorPlugin(this.fileSystem);
case fileGzip:
return new FileGZipCollectorPlugin(this.fileSystem);
case baseDump:
return new BaseCollectorPlugin(this.fileSystem);
case gtr2Publications:
return new Gtr2PublicationsCollectorPlugin(this.clientParams);
case osfPreprints:
return new OsfPreprintsCollectorPlugin(this.clientParams);
case zenodoDump:
return new CollectZenodoDumpCollectorPlugin();
case fileCSV:
return new FileCsvCollectorPlugin(this.fileSystem);
case httpCSV:
return new HttpCsvCollectorPlugin(this.clientParams);
case other:
final CollectorPlugin.NAME.OTHER_NAME plugin = Optional
.ofNullable(this.api.getParams().get("other_plugin_type"))
.map(CollectorPlugin.NAME.OTHER_NAME::valueOf)
.orElseThrow(() -> new IllegalArgumentException("invalid other_plugin_type"));
switch (plugin) {
case mdstore_mongodb_dump:
return new MongoDbDumpCollectorPlugin(this.fileSystem);
case mdstore_mongodb:
return new MDStoreCollectorPlugin();
default:
throw new UnknownCollectorPluginException("plugin is not managed: " + plugin);
}
switch (plugin) {
case mdstore_mongodb_dump:
return new MongoDbDumpCollectorPlugin(this.fileSystem);
case mdstore_mongodb:
return new MDStoreCollectorPlugin();
default:
throw new UnknownCollectorPluginException("protocol is not managed: " + this.api.getProtocol());
throw new UnknownCollectorPluginException("plugin is not managed: " + plugin);
}
default:
throw new UnknownCollectorPluginException("protocol is not managed: " + this.api.getProtocol());
}
}

View File

@ -11,10 +11,21 @@ public interface CollectorPlugin {
enum NAME {
oai, other, rest_json2xml, file, fileGzip, baseDump, gtr2Publications, osfPreprints, zenodoDump;
oai,
other,
rest_json2xml,
file,
fileGzip,
baseDump,
gtr2Publications,
osfPreprints,
zenodoDump,
fileCSV,
httpCSV;
public enum OTHER_NAME {
mdstore_mongodb_dump, mdstore_mongodb
mdstore_mongodb_dump,
mdstore_mongodb
}
}

View File

@ -0,0 +1,42 @@
package eu.dnetlib.dhp.collection.plugin.csv;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.hadoop.fs.FileSystem;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
public class FileCsvCollectorPlugin implements CollectorPlugin {
private final FileSystem fileSystem;
public FileCsvCollectorPlugin(final FileSystem fileSystem) {
this.fileSystem = fileSystem;
}
@Override
public Stream<String> collect(final ApiDescriptor api, final AggregatorReport report) throws CollectorException {
final String baseUrl = api.getBaseUrl();
final String header = api.getParams().get("header");
final String separator = api.getParams().get("separator");
final String identifier = api.getParams().get("identifier");
final String quote = api.getParams().get("quote");
final Iterator<String> iterator = new FileCsvIterator(baseUrl, header, separator, identifier, quote, this.fileSystem);
final Spliterator<String> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
return StreamSupport.stream(spliterator, false);
}
}

View File

@ -0,0 +1,26 @@
package eu.dnetlib.dhp.collection.plugin.csv;
import java.util.Iterator;
import org.apache.hadoop.fs.FileSystem;
public class FileCsvIterator implements Iterator<String> {
public FileCsvIterator(final String baseUrl, final String header, final String separator, final String identifier, final String quote,
final FileSystem fileSystem) {
// TODO Auto-generated constructor stub
}
@Override
public boolean hasNext() {
// TODO Auto-generated method stub
return false;
}
@Override
public String next() {
// TODO Auto-generated method stub
return null;
}
}

View File

@ -0,0 +1,41 @@
package eu.dnetlib.dhp.collection.plugin.csv;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
public class HttpCsvCollectorPlugin implements CollectorPlugin {
private final HttpClientParams clientParams;
public HttpCsvCollectorPlugin(final HttpClientParams clientParams) {
this.clientParams = clientParams;
}
@Override
public Stream<String> collect(final ApiDescriptor api, final AggregatorReport report) throws CollectorException {
final String baseUrl = api.getBaseUrl();
final String header = api.getParams().get("header");
final String separator = api.getParams().get("separator");
final String identifier = api.getParams().get("identifier");
final String quote = api.getParams().get("quote");
final Iterator<String> iterator = new HttpCsvIterator(baseUrl, header, separator, identifier, quote, this.clientParams);
final Spliterator<String> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
return StreamSupport.stream(spliterator, false);
}
}

View File

@ -0,0 +1,26 @@
package eu.dnetlib.dhp.collection.plugin.csv;
import java.util.Iterator;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
public class HttpCsvIterator implements Iterator<String> {
public HttpCsvIterator(final String baseUrl, final String header, final String separator, final String identifier, final String quote,
final HttpClientParams clientParams) {
// TODO Auto-generated constructor stub
}
@Override
public boolean hasNext() {
// TODO Auto-generated method stub
return false;
}
@Override
public String next() {
// TODO Auto-generated method stub
return null;
}
}