1
0
Fork 0

code formatting

This commit is contained in:
Claudio Atzori 2024-09-25 15:27:32 +02:00
parent e354f9853a
commit 6397141e56
5 changed files with 75 additions and 61 deletions

View File

@ -46,11 +46,11 @@ public class CollectorWorker extends ReportingJob {
private final HttpClientParams clientParams;
public CollectorWorker(
final ApiDescriptor api,
final FileSystem fileSystem,
final MDStoreVersion mdStoreVersion,
final HttpClientParams clientParams,
final AggregatorReport report) {
final ApiDescriptor api,
final FileSystem fileSystem,
final MDStoreVersion mdStoreVersion,
final HttpClientParams clientParams,
final AggregatorReport report) {
super(report);
this.api = api;
this.fileSystem = fileSystem;
@ -69,22 +69,25 @@ public class CollectorWorker extends ReportingJob {
scheduleReport(counter);
try (SequenceFile.Writer writer = SequenceFile
.createWriter(this.fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
.keyClass(IntWritable.class), SequenceFile.Writer
.valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
.createWriter(
this.fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
.keyClass(IntWritable.class),
SequenceFile.Writer
.valueClass(Text.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
final IntWritable key = new IntWritable(counter.get());
final Text value = new Text();
plugin
.collect(this.api, this.report)
.forEach(content -> {
key.set(counter.getAndIncrement());
value.set(content);
try {
writer.append(key, value);
} catch (final Throwable e) {
throw new RuntimeException(e);
}
});
.collect(this.api, this.report)
.forEach(content -> {
key.set(counter.getAndIncrement());
value.set(content);
try {
writer.append(key, value);
} catch (final Throwable e) {
throw new RuntimeException(e);
}
});
} catch (final Throwable e) {
this.report.put(e.getClass().getName(), e.getMessage());
throw new CollectorException(e);
@ -112,36 +115,36 @@ public class CollectorWorker extends ReportingJob {
private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException {
switch (CollectorPlugin.NAME.valueOf(this.api.getProtocol())) {
case oai:
return new OaiCollectorPlugin(this.clientParams);
case rest_json2xml:
return new RestCollectorPlugin(this.clientParams);
case file:
return new FileCollectorPlugin(this.fileSystem);
case fileGzip:
return new FileGZipCollectorPlugin(this.fileSystem);
case baseDump:
return new BaseCollectorPlugin(this.fileSystem);
case gtr2Publications:
return new Gtr2PublicationsCollectorPlugin(this.clientParams);
case osfPreprints:
return new OsfPreprintsCollectorPlugin(this.clientParams);
case other:
final CollectorPlugin.NAME.OTHER_NAME plugin = Optional
case oai:
return new OaiCollectorPlugin(this.clientParams);
case rest_json2xml:
return new RestCollectorPlugin(this.clientParams);
case file:
return new FileCollectorPlugin(this.fileSystem);
case fileGzip:
return new FileGZipCollectorPlugin(this.fileSystem);
case baseDump:
return new BaseCollectorPlugin(this.fileSystem);
case gtr2Publications:
return new Gtr2PublicationsCollectorPlugin(this.clientParams);
case osfPreprints:
return new OsfPreprintsCollectorPlugin(this.clientParams);
case other:
final CollectorPlugin.NAME.OTHER_NAME plugin = Optional
.ofNullable(this.api.getParams().get("other_plugin_type"))
.map(CollectorPlugin.NAME.OTHER_NAME::valueOf)
.orElseThrow(() -> new IllegalArgumentException("invalid other_plugin_type"));
switch (plugin) {
case mdstore_mongodb_dump:
return new MongoDbDumpCollectorPlugin(this.fileSystem);
case mdstore_mongodb:
return new MDStoreCollectorPlugin();
switch (plugin) {
case mdstore_mongodb_dump:
return new MongoDbDumpCollectorPlugin(this.fileSystem);
case mdstore_mongodb:
return new MDStoreCollectorPlugin();
default:
throw new UnknownCollectorPluginException("plugin is not managed: " + plugin);
}
default:
throw new UnknownCollectorPluginException("plugin is not managed: " + plugin);
}
default:
throw new UnknownCollectorPluginException("protocol is not managed: " + this.api.getProtocol());
throw new UnknownCollectorPluginException("protocol is not managed: " + this.api.getProtocol());
}
}

View File

@ -31,17 +31,19 @@ public class OsfPreprintsCollectorPlugin implements CollectorPlugin {
final String baseUrl = api.getBaseUrl();
final int pageSize = Optional
.ofNullable(api.getParams().get("pageSize"))
.filter(StringUtils::isNotBlank)
.map(s -> NumberUtils.toInt(s, PAGE_SIZE_VALUE_DEFAULT))
.orElse(PAGE_SIZE_VALUE_DEFAULT);
.ofNullable(api.getParams().get("pageSize"))
.filter(StringUtils::isNotBlank)
.map(s -> NumberUtils.toInt(s, PAGE_SIZE_VALUE_DEFAULT))
.orElse(PAGE_SIZE_VALUE_DEFAULT);
if (StringUtils.isBlank(baseUrl)) { throw new CollectorException("Param 'baseUrl' is null or empty"); }
if (StringUtils.isBlank(baseUrl)) {
throw new CollectorException("Param 'baseUrl' is null or empty");
}
final OsfPreprintsIterator it = new OsfPreprintsIterator(baseUrl, pageSize, getClientParams());
return StreamSupport
.stream(Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), false);
.stream(Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), false);
}
public HttpClientParams getClientParams() {

View File

@ -34,9 +34,9 @@ public class OsfPreprintsIterator implements Iterator<String> {
private final Queue<String> recordQueue = new PriorityBlockingQueue<>();
public OsfPreprintsIterator(
final String baseUrl,
final int pageSize,
final HttpClientParams clientParams) {
final String baseUrl,
final int pageSize,
final HttpClientParams clientParams) {
this.clientParams = clientParams;
this.baseUrl = baseUrl;
@ -54,7 +54,8 @@ public class OsfPreprintsIterator implements Iterator<String> {
@Override
public boolean hasNext() {
synchronized (this.recordQueue) {
while (this.recordQueue.isEmpty() && StringUtils.isNotBlank(this.currentUrl) && this.currentUrl.startsWith("http")) {
while (this.recordQueue.isEmpty() && StringUtils.isNotBlank(this.currentUrl)
&& this.currentUrl.startsWith("http")) {
try {
this.currentUrl = downloadPage(this.currentUrl);
} catch (final CollectorException e) {
@ -63,7 +64,9 @@ public class OsfPreprintsIterator implements Iterator<String> {
}
}
if (!this.recordQueue.isEmpty()) { return true; }
if (!this.recordQueue.isEmpty()) {
return true;
}
return false;
}
@ -112,7 +115,9 @@ public class OsfPreprintsIterator implements Iterator<String> {
}
private Document downloadUrl(final String url, final int attempt) throws CollectorException {
if (attempt > MAX_ATTEMPTS) { throw new CollectorException("Max Number of attempts reached, url:" + url); }
if (attempt > MAX_ATTEMPTS) {
throw new CollectorException("Max Number of attempts reached, url:" + url);
}
if (attempt > 0) {
final int delay = (attempt * 5000);

View File

@ -517,8 +517,10 @@ case object Crossref2Oaf {
)
}
if(doi.startsWith("10.3410") || doi.startsWith("10.12703"))
instance.setHostedby(OafMapperUtils.keyValue(OafMapperUtils.createOpenaireId(10, "openaire____::H1Connect", true),"H1Connect"))
if (doi.startsWith("10.3410") || doi.startsWith("10.12703"))
instance.setHostedby(
OafMapperUtils.keyValue(OafMapperUtils.createOpenaireId(10, "openaire____::H1Connect", true), "H1Connect")
)
instance.setAccessright(
decideAccessRight(instance.getLicense, result.getDateofacceptance.getValue)

View File

@ -50,9 +50,10 @@ public class OsfPreprintsCollectorPluginTest {
@Test
@Disabled
void test_one() throws CollectorException {
this.plugin.collect(this.api, new AggregatorReport())
.limit(1)
.forEach(log::info);
this.plugin
.collect(this.api, new AggregatorReport())
.limit(1)
.forEach(log::info);
}
@Test
@ -95,7 +96,8 @@ public class OsfPreprintsCollectorPluginTest {
final HttpConnector2 connector = new HttpConnector2();
try {
final String res = connector.getInputSource("https://api.osf.io/v2/preprints/ydtzx/contributors/?format=json");
final String res = connector
.getInputSource("https://api.osf.io/v2/preprints/ydtzx/contributors/?format=json");
System.out.println(res);
fail();
} catch (final Throwable e) {