forked from D-Net/dnet-hadoop
merged from the osfPreprints_plugin branch
This commit is contained in:
commit
4e9f64e01a
|
@ -1,4 +1,3 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.schema.oaf.utils;
|
package eu.dnetlib.dhp.schema.oaf.utils;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
|
@ -46,11 +46,11 @@ public class CollectorWorker extends ReportingJob {
|
||||||
private final HttpClientParams clientParams;
|
private final HttpClientParams clientParams;
|
||||||
|
|
||||||
public CollectorWorker(
|
public CollectorWorker(
|
||||||
final ApiDescriptor api,
|
final ApiDescriptor api,
|
||||||
final FileSystem fileSystem,
|
final FileSystem fileSystem,
|
||||||
final MDStoreVersion mdStoreVersion,
|
final MDStoreVersion mdStoreVersion,
|
||||||
final HttpClientParams clientParams,
|
final HttpClientParams clientParams,
|
||||||
final AggregatorReport report) {
|
final AggregatorReport report) {
|
||||||
super(report);
|
super(report);
|
||||||
this.api = api;
|
this.api = api;
|
||||||
this.fileSystem = fileSystem;
|
this.fileSystem = fileSystem;
|
||||||
|
@ -69,25 +69,22 @@ public class CollectorWorker extends ReportingJob {
|
||||||
scheduleReport(counter);
|
scheduleReport(counter);
|
||||||
|
|
||||||
try (SequenceFile.Writer writer = SequenceFile
|
try (SequenceFile.Writer writer = SequenceFile
|
||||||
.createWriter(
|
.createWriter(this.fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
|
||||||
this.fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
|
.keyClass(IntWritable.class), SequenceFile.Writer
|
||||||
.keyClass(IntWritable.class),
|
.valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
|
||||||
SequenceFile.Writer
|
|
||||||
.valueClass(Text.class),
|
|
||||||
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
|
|
||||||
final IntWritable key = new IntWritable(counter.get());
|
final IntWritable key = new IntWritable(counter.get());
|
||||||
final Text value = new Text();
|
final Text value = new Text();
|
||||||
plugin
|
plugin
|
||||||
.collect(this.api, this.report)
|
.collect(this.api, this.report)
|
||||||
.forEach(content -> {
|
.forEach(content -> {
|
||||||
key.set(counter.getAndIncrement());
|
key.set(counter.getAndIncrement());
|
||||||
value.set(content);
|
value.set(content);
|
||||||
try {
|
try {
|
||||||
writer.append(key, value);
|
writer.append(key, value);
|
||||||
} catch (final Throwable e) {
|
} catch (final Throwable e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (final Throwable e) {
|
} catch (final Throwable e) {
|
||||||
this.report.put(e.getClass().getName(), e.getMessage());
|
this.report.put(e.getClass().getName(), e.getMessage());
|
||||||
throw new CollectorException(e);
|
throw new CollectorException(e);
|
||||||
|
@ -115,36 +112,36 @@ public class CollectorWorker extends ReportingJob {
|
||||||
private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException {
|
private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException {
|
||||||
|
|
||||||
switch (CollectorPlugin.NAME.valueOf(this.api.getProtocol())) {
|
switch (CollectorPlugin.NAME.valueOf(this.api.getProtocol())) {
|
||||||
case oai:
|
case oai:
|
||||||
return new OaiCollectorPlugin(this.clientParams);
|
return new OaiCollectorPlugin(this.clientParams);
|
||||||
case rest_json2xml:
|
case rest_json2xml:
|
||||||
return new RestCollectorPlugin(this.clientParams);
|
return new RestCollectorPlugin(this.clientParams);
|
||||||
case file:
|
case file:
|
||||||
return new FileCollectorPlugin(this.fileSystem);
|
return new FileCollectorPlugin(this.fileSystem);
|
||||||
case fileGzip:
|
case fileGzip:
|
||||||
return new FileGZipCollectorPlugin(this.fileSystem);
|
return new FileGZipCollectorPlugin(this.fileSystem);
|
||||||
case baseDump:
|
case baseDump:
|
||||||
return new BaseCollectorPlugin(this.fileSystem);
|
return new BaseCollectorPlugin(this.fileSystem);
|
||||||
case gtr2Publications:
|
case gtr2Publications:
|
||||||
return new Gtr2PublicationsCollectorPlugin(this.clientParams);
|
return new Gtr2PublicationsCollectorPlugin(this.clientParams);
|
||||||
case osfPreprints:
|
case osfPreprints:
|
||||||
return new OsfPreprintsCollectorPlugin(this.clientParams);
|
return new OsfPreprintsCollectorPlugin(this.clientParams);
|
||||||
case other:
|
case other:
|
||||||
final CollectorPlugin.NAME.OTHER_NAME plugin = Optional
|
final CollectorPlugin.NAME.OTHER_NAME plugin = Optional
|
||||||
.ofNullable(this.api.getParams().get("other_plugin_type"))
|
.ofNullable(this.api.getParams().get("other_plugin_type"))
|
||||||
.map(CollectorPlugin.NAME.OTHER_NAME::valueOf)
|
.map(CollectorPlugin.NAME.OTHER_NAME::valueOf)
|
||||||
.orElseThrow(() -> new IllegalArgumentException("invalid other_plugin_type"));
|
.orElseThrow(() -> new IllegalArgumentException("invalid other_plugin_type"));
|
||||||
|
|
||||||
switch (plugin) {
|
switch (plugin) {
|
||||||
case mdstore_mongodb_dump:
|
case mdstore_mongodb_dump:
|
||||||
return new MongoDbDumpCollectorPlugin(this.fileSystem);
|
return new MongoDbDumpCollectorPlugin(this.fileSystem);
|
||||||
case mdstore_mongodb:
|
case mdstore_mongodb:
|
||||||
return new MDStoreCollectorPlugin();
|
return new MDStoreCollectorPlugin();
|
||||||
default:
|
|
||||||
throw new UnknownCollectorPluginException("plugin is not managed: " + plugin);
|
|
||||||
}
|
|
||||||
default:
|
default:
|
||||||
throw new UnknownCollectorPluginException("protocol is not managed: " + this.api.getProtocol());
|
throw new UnknownCollectorPluginException("plugin is not managed: " + plugin);
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
throw new UnknownCollectorPluginException("protocol is not managed: " + this.api.getProtocol());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,19 +31,17 @@ public class OsfPreprintsCollectorPlugin implements CollectorPlugin {
|
||||||
final String baseUrl = api.getBaseUrl();
|
final String baseUrl = api.getBaseUrl();
|
||||||
|
|
||||||
final int pageSize = Optional
|
final int pageSize = Optional
|
||||||
.ofNullable(api.getParams().get("pageSize"))
|
.ofNullable(api.getParams().get("pageSize"))
|
||||||
.filter(StringUtils::isNotBlank)
|
.filter(StringUtils::isNotBlank)
|
||||||
.map(s -> NumberUtils.toInt(s, PAGE_SIZE_VALUE_DEFAULT))
|
.map(s -> NumberUtils.toInt(s, PAGE_SIZE_VALUE_DEFAULT))
|
||||||
.orElse(PAGE_SIZE_VALUE_DEFAULT);
|
.orElse(PAGE_SIZE_VALUE_DEFAULT);
|
||||||
|
|
||||||
if (StringUtils.isBlank(baseUrl)) {
|
if (StringUtils.isBlank(baseUrl)) { throw new CollectorException("Param 'baseUrl' is null or empty"); }
|
||||||
throw new CollectorException("Param 'baseUrl' is null or empty");
|
|
||||||
}
|
|
||||||
|
|
||||||
final OsfPreprintsIterator it = new OsfPreprintsIterator(baseUrl, pageSize, getClientParams());
|
final OsfPreprintsIterator it = new OsfPreprintsIterator(baseUrl, pageSize, getClientParams());
|
||||||
|
|
||||||
return StreamSupport
|
return StreamSupport
|
||||||
.stream(Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), false);
|
.stream(Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public HttpClientParams getClientParams() {
|
public HttpClientParams getClientParams() {
|
||||||
|
|
|
@ -34,9 +34,9 @@ public class OsfPreprintsIterator implements Iterator<String> {
|
||||||
private final Queue<String> recordQueue = new PriorityBlockingQueue<>();
|
private final Queue<String> recordQueue = new PriorityBlockingQueue<>();
|
||||||
|
|
||||||
public OsfPreprintsIterator(
|
public OsfPreprintsIterator(
|
||||||
final String baseUrl,
|
final String baseUrl,
|
||||||
final int pageSize,
|
final int pageSize,
|
||||||
final HttpClientParams clientParams) {
|
final HttpClientParams clientParams) {
|
||||||
|
|
||||||
this.clientParams = clientParams;
|
this.clientParams = clientParams;
|
||||||
this.baseUrl = baseUrl;
|
this.baseUrl = baseUrl;
|
||||||
|
@ -54,8 +54,7 @@ public class OsfPreprintsIterator implements Iterator<String> {
|
||||||
@Override
|
@Override
|
||||||
public boolean hasNext() {
|
public boolean hasNext() {
|
||||||
synchronized (this.recordQueue) {
|
synchronized (this.recordQueue) {
|
||||||
while (this.recordQueue.isEmpty() && StringUtils.isNotBlank(this.currentUrl)
|
while (this.recordQueue.isEmpty() && StringUtils.isNotBlank(this.currentUrl) && this.currentUrl.startsWith("http")) {
|
||||||
&& this.currentUrl.startsWith("http")) {
|
|
||||||
try {
|
try {
|
||||||
this.currentUrl = downloadPage(this.currentUrl);
|
this.currentUrl = downloadPage(this.currentUrl);
|
||||||
} catch (final CollectorException e) {
|
} catch (final CollectorException e) {
|
||||||
|
@ -64,9 +63,7 @@ public class OsfPreprintsIterator implements Iterator<String> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!this.recordQueue.isEmpty()) {
|
if (!this.recordQueue.isEmpty()) { return true; }
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -115,9 +112,7 @@ public class OsfPreprintsIterator implements Iterator<String> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private Document downloadUrl(final String url, final int attempt) throws CollectorException {
|
private Document downloadUrl(final String url, final int attempt) throws CollectorException {
|
||||||
if (attempt > MAX_ATTEMPTS) {
|
if (attempt > MAX_ATTEMPTS) { throw new CollectorException("Max Number of attempts reached, url:" + url); }
|
||||||
throw new CollectorException("Max Number of attempts reached, url:" + url);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (attempt > 0) {
|
if (attempt > 0) {
|
||||||
final int delay = (attempt * 5000);
|
final int delay = (attempt * 5000);
|
||||||
|
|
|
@ -35,18 +35,6 @@
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
}
|
}
|
||||||
,
|
,
|
||||||
{
|
|
||||||
"paramName": "wip",
|
|
||||||
"paramLongName": "webCrawlInputPath",
|
|
||||||
"paramDescription": "the path to get the input data from Web Crawl",
|
|
||||||
"paramRequired": true
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"paramName": "pip",
|
|
||||||
"paramLongName": "publisherInputPath",
|
|
||||||
"paramDescription": "the path to get the input data from publishers",
|
|
||||||
"paramRequired": true
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"paramName": "o",
|
"paramName": "o",
|
||||||
"paramLongName": "outputPath",
|
"paramLongName": "outputPath",
|
||||||
|
|
|
@ -50,10 +50,9 @@ public class OsfPreprintsCollectorPluginTest {
|
||||||
@Test
|
@Test
|
||||||
@Disabled
|
@Disabled
|
||||||
void test_one() throws CollectorException {
|
void test_one() throws CollectorException {
|
||||||
this.plugin
|
this.plugin.collect(this.api, new AggregatorReport())
|
||||||
.collect(this.api, new AggregatorReport())
|
.limit(1)
|
||||||
.limit(1)
|
.forEach(log::info);
|
||||||
.forEach(log::info);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -96,8 +95,7 @@ public class OsfPreprintsCollectorPluginTest {
|
||||||
final HttpConnector2 connector = new HttpConnector2();
|
final HttpConnector2 connector = new HttpConnector2();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final String res = connector
|
final String res = connector.getInputSource("https://api.osf.io/v2/preprints/ydtzx/contributors/?format=json");
|
||||||
.getInputSource("https://api.osf.io/v2/preprints/ydtzx/contributors/?format=json");
|
|
||||||
System.out.println(res);
|
System.out.println(res);
|
||||||
fail();
|
fail();
|
||||||
} catch (final Throwable e) {
|
} catch (final Throwable e) {
|
||||||
|
|
|
@ -29,7 +29,7 @@ class JsonPathTest {
|
||||||
Assertions.assertNotNull(row);
|
Assertions.assertNotNull(row);
|
||||||
Assertions.assertTrue(StringUtils.isNotBlank(row.getAs("identifier")));
|
Assertions.assertTrue(StringUtils.isNotBlank(row.getAs("identifier")));
|
||||||
|
|
||||||
System.out.println("row = " + row.getAs("country"));
|
System.out.println("row = " + row.getAs("countrytitle"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue