1
0
Fork 0
This commit is contained in:
Miriam Baglioni 2024-09-25 17:28:23 +02:00
commit 599e56dbc6
9 changed files with 132 additions and 73 deletions

View File

@ -212,11 +212,11 @@ public class HttpConnector2 {
.format( .format(
"Unexpected status code: %s errors: %s", urlConn.getResponseCode(), "Unexpected status code: %s errors: %s", urlConn.getResponseCode(),
MAPPER.writeValueAsString(report))); MAPPER.writeValueAsString(report)));
} catch (MalformedURLException | UnknownHostException e) { } catch (MalformedURLException e) {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
report.put(e.getClass().getName(), e.getMessage()); report.put(e.getClass().getName(), e.getMessage());
throw new CollectorException(e.getMessage(), e); throw new CollectorException(e.getMessage(), e);
} catch (SocketTimeoutException | SocketException e) { } catch (SocketTimeoutException | SocketException | UnknownHostException e) {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
report.put(e.getClass().getName(), e.getMessage()); report.put(e.getClass().getName(), e.getMessage());
backoffAndSleep(getClientParams().getRetryDelay() * retryNumber * 1000); backoffAndSleep(getClientParams().getRetryDelay() * retryNumber * 1000);

View File

@ -49,6 +49,9 @@ public class ReadCOCI implements Serializable {
final String workingPath = parser.get("inputPath"); final String workingPath = parser.get("inputPath");
log.info("workingPath {}", workingPath); log.info("workingPath {}", workingPath);
final String backupPath = parser.get("backupPath");
log.info("backupPath {}", backupPath);
SparkConf sconf = new SparkConf(); SparkConf sconf = new SparkConf();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -68,12 +71,14 @@ public class ReadCOCI implements Serializable {
workingPath, workingPath,
fileSystem, fileSystem,
outputPath, outputPath,
backupPath,
delimiter); delimiter);
}); });
} }
private static void doRead(SparkSession spark, String workingPath, FileSystem fileSystem, private static void doRead(SparkSession spark, String workingPath, FileSystem fileSystem,
String outputPath, String outputPath,
String backupPath,
String delimiter) throws IOException { String delimiter) throws IOException {
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
.listFiles( .listFiles(
@ -107,7 +112,8 @@ public class ReadCOCI implements Serializable {
.mode(SaveMode.Append) .mode(SaveMode.Append)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath); .json(outputPath);
fileSystem.rename(fileStatus.getPath(), new Path("/tmp/miriam/OC/DONE"));
fileSystem.rename(fileStatus.getPath(), new Path(backupPath));
} }
} }

View File

@ -46,11 +46,11 @@ public class CollectorWorker extends ReportingJob {
private final HttpClientParams clientParams; private final HttpClientParams clientParams;
public CollectorWorker( public CollectorWorker(
final ApiDescriptor api, final ApiDescriptor api,
final FileSystem fileSystem, final FileSystem fileSystem,
final MDStoreVersion mdStoreVersion, final MDStoreVersion mdStoreVersion,
final HttpClientParams clientParams, final HttpClientParams clientParams,
final AggregatorReport report) { final AggregatorReport report) {
super(report); super(report);
this.api = api; this.api = api;
this.fileSystem = fileSystem; this.fileSystem = fileSystem;
@ -69,22 +69,25 @@ public class CollectorWorker extends ReportingJob {
scheduleReport(counter); scheduleReport(counter);
try (SequenceFile.Writer writer = SequenceFile try (SequenceFile.Writer writer = SequenceFile
.createWriter(this.fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer .createWriter(
.keyClass(IntWritable.class), SequenceFile.Writer this.fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
.valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) { .keyClass(IntWritable.class),
SequenceFile.Writer
.valueClass(Text.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
final IntWritable key = new IntWritable(counter.get()); final IntWritable key = new IntWritable(counter.get());
final Text value = new Text(); final Text value = new Text();
plugin plugin
.collect(this.api, this.report) .collect(this.api, this.report)
.forEach(content -> { .forEach(content -> {
key.set(counter.getAndIncrement()); key.set(counter.getAndIncrement());
value.set(content); value.set(content);
try { try {
writer.append(key, value); writer.append(key, value);
} catch (final Throwable e) { } catch (final Throwable e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
} catch (final Throwable e) { } catch (final Throwable e) {
this.report.put(e.getClass().getName(), e.getMessage()); this.report.put(e.getClass().getName(), e.getMessage());
throw new CollectorException(e); throw new CollectorException(e);
@ -112,36 +115,36 @@ public class CollectorWorker extends ReportingJob {
private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException { private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException {
switch (CollectorPlugin.NAME.valueOf(this.api.getProtocol())) { switch (CollectorPlugin.NAME.valueOf(this.api.getProtocol())) {
case oai: case oai:
return new OaiCollectorPlugin(this.clientParams); return new OaiCollectorPlugin(this.clientParams);
case rest_json2xml: case rest_json2xml:
return new RestCollectorPlugin(this.clientParams); return new RestCollectorPlugin(this.clientParams);
case file: case file:
return new FileCollectorPlugin(this.fileSystem); return new FileCollectorPlugin(this.fileSystem);
case fileGzip: case fileGzip:
return new FileGZipCollectorPlugin(this.fileSystem); return new FileGZipCollectorPlugin(this.fileSystem);
case baseDump: case baseDump:
return new BaseCollectorPlugin(this.fileSystem); return new BaseCollectorPlugin(this.fileSystem);
case gtr2Publications: case gtr2Publications:
return new Gtr2PublicationsCollectorPlugin(this.clientParams); return new Gtr2PublicationsCollectorPlugin(this.clientParams);
case osfPreprints: case osfPreprints:
return new OsfPreprintsCollectorPlugin(this.clientParams); return new OsfPreprintsCollectorPlugin(this.clientParams);
case other: case other:
final CollectorPlugin.NAME.OTHER_NAME plugin = Optional final CollectorPlugin.NAME.OTHER_NAME plugin = Optional
.ofNullable(this.api.getParams().get("other_plugin_type")) .ofNullable(this.api.getParams().get("other_plugin_type"))
.map(CollectorPlugin.NAME.OTHER_NAME::valueOf) .map(CollectorPlugin.NAME.OTHER_NAME::valueOf)
.orElseThrow(() -> new IllegalArgumentException("invalid other_plugin_type")); .orElseThrow(() -> new IllegalArgumentException("invalid other_plugin_type"));
switch (plugin) { switch (plugin) {
case mdstore_mongodb_dump: case mdstore_mongodb_dump:
return new MongoDbDumpCollectorPlugin(this.fileSystem); return new MongoDbDumpCollectorPlugin(this.fileSystem);
case mdstore_mongodb: case mdstore_mongodb:
return new MDStoreCollectorPlugin(); return new MDStoreCollectorPlugin();
default:
throw new UnknownCollectorPluginException("plugin is not managed: " + plugin);
}
default: default:
throw new UnknownCollectorPluginException("plugin is not managed: " + plugin); throw new UnknownCollectorPluginException("protocol is not managed: " + this.api.getProtocol());
}
default:
throw new UnknownCollectorPluginException("protocol is not managed: " + this.api.getProtocol());
} }
} }

View File

@ -31,17 +31,19 @@ public class OsfPreprintsCollectorPlugin implements CollectorPlugin {
final String baseUrl = api.getBaseUrl(); final String baseUrl = api.getBaseUrl();
final int pageSize = Optional final int pageSize = Optional
.ofNullable(api.getParams().get("pageSize")) .ofNullable(api.getParams().get("pageSize"))
.filter(StringUtils::isNotBlank) .filter(StringUtils::isNotBlank)
.map(s -> NumberUtils.toInt(s, PAGE_SIZE_VALUE_DEFAULT)) .map(s -> NumberUtils.toInt(s, PAGE_SIZE_VALUE_DEFAULT))
.orElse(PAGE_SIZE_VALUE_DEFAULT); .orElse(PAGE_SIZE_VALUE_DEFAULT);
if (StringUtils.isBlank(baseUrl)) { throw new CollectorException("Param 'baseUrl' is null or empty"); } if (StringUtils.isBlank(baseUrl)) {
throw new CollectorException("Param 'baseUrl' is null or empty");
}
final OsfPreprintsIterator it = new OsfPreprintsIterator(baseUrl, pageSize, getClientParams()); final OsfPreprintsIterator it = new OsfPreprintsIterator(baseUrl, pageSize, getClientParams());
return StreamSupport return StreamSupport
.stream(Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), false); .stream(Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), false);
} }
public HttpClientParams getClientParams() { public HttpClientParams getClientParams() {

View File

@ -5,6 +5,7 @@ import java.util.Iterator;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document; import org.dom4j.Document;
import org.dom4j.DocumentHelper; import org.dom4j.DocumentHelper;
import org.dom4j.Element; import org.dom4j.Element;
@ -33,9 +34,9 @@ public class OsfPreprintsIterator implements Iterator<String> {
private final Queue<String> recordQueue = new PriorityBlockingQueue<>(); private final Queue<String> recordQueue = new PriorityBlockingQueue<>();
public OsfPreprintsIterator( public OsfPreprintsIterator(
final String baseUrl, final String baseUrl,
final int pageSize, final int pageSize,
final HttpClientParams clientParams) { final HttpClientParams clientParams) {
this.clientParams = clientParams; this.clientParams = clientParams;
this.baseUrl = baseUrl; this.baseUrl = baseUrl;
@ -46,13 +47,15 @@ public class OsfPreprintsIterator implements Iterator<String> {
private void initQueue() { private void initQueue() {
this.currentUrl = this.baseUrl + "?filter:is_published:d=true&format=json&page[size]=" + this.pageSize; this.currentUrl = this.baseUrl + "?filter:is_published:d=true&format=json&page[size]=" + this.pageSize;
log.info("REST calls starting with {}", this.currentUrl); log.info("REST calls starting with {}", this.currentUrl);
} }
@Override @Override
public boolean hasNext() { public boolean hasNext() {
synchronized (this.recordQueue) { synchronized (this.recordQueue) {
while (this.recordQueue.isEmpty() && !this.currentUrl.isEmpty()) { while (this.recordQueue.isEmpty() && StringUtils.isNotBlank(this.currentUrl)
&& this.currentUrl.startsWith("http")) {
try { try {
this.currentUrl = downloadPage(this.currentUrl); this.currentUrl = downloadPage(this.currentUrl);
} catch (final CollectorException e) { } catch (final CollectorException e) {
@ -61,7 +64,9 @@ public class OsfPreprintsIterator implements Iterator<String> {
} }
} }
if (!this.recordQueue.isEmpty()) { return true; } if (!this.recordQueue.isEmpty()) {
return true;
}
return false; return false;
} }
@ -83,17 +88,23 @@ public class OsfPreprintsIterator implements Iterator<String> {
final Element n = (Element) ((Element) o).detach(); final Element n = (Element) ((Element) o).detach();
final Element group = DocumentHelper.createElement("group"); final Element group = DocumentHelper.createElement("group");
group.addAttribute("id", n.valueOf(".//data/id")); group.addAttribute("id", n.valueOf("./id"));
group.addElement("preprint").add(n); group.addElement("preprint").add(n);
for (final Object o1 : n.selectNodes(".//contributors//href")) { for (final Object o1 : n.selectNodes(".//contributors//href")) {
final Document doc1 = downloadUrl(((Node) o1).getText(), 0); final String href = ((Node) o1).getText();
group.addElement("contributors").add(doc1.getRootElement().detach()); if (StringUtils.isNotBlank(href) && href.startsWith("http")) {
final Document doc1 = downloadUrl(href, 0);
group.addElement("contributors").add(doc1.getRootElement().detach());
}
} }
for (final Object o1 : n.selectNodes(".//primary_file//href")) { for (final Object o1 : n.selectNodes(".//primary_file//href")) {
final Document doc1 = downloadUrl(((Node) o1).getText(), 0); final String href = ((Node) o1).getText();
group.addElement("primary_file").add(doc1.getRootElement().detach()); if (StringUtils.isNotBlank(href) && href.startsWith("http")) {
final Document doc1 = downloadUrl(href, 0);
group.addElement("primary_file").add(doc1.getRootElement().detach());
}
} }
this.recordQueue.add(DocumentHelper.createDocument(group).asXML()); this.recordQueue.add(DocumentHelper.createDocument(group).asXML());
@ -104,7 +115,9 @@ public class OsfPreprintsIterator implements Iterator<String> {
} }
private Document downloadUrl(final String url, final int attempt) throws CollectorException { private Document downloadUrl(final String url, final int attempt) throws CollectorException {
if (attempt > MAX_ATTEMPTS) { throw new CollectorException("Max Number of attempts reached, url:" + url); } if (attempt > MAX_ATTEMPTS) {
throw new CollectorException("Max Number of attempts reached, url:" + url);
}
if (attempt > 0) { if (attempt > 0) {
final int delay = (attempt * 5000); final int delay = (attempt * 5000);

View File

@ -24,12 +24,19 @@
"paramLongName": "outputPath", "paramLongName": "outputPath",
"paramDescription": "the hdfs name node", "paramDescription": "the hdfs name node",
"paramRequired": true "paramRequired": true
}, { },
"paramName": "nn", {
"paramLongName": "hdfsNameNode", "paramName": "nn",
"paramDescription": "the hdfs name node", "paramLongName": "hdfsNameNode",
"paramRequired": true "paramDescription": "the hdfs name node",
} "paramRequired": true
},
{
"paramName": "bp",
"paramLongName": "backupPath",
"paramDescription": "the hdfs path to move the OC data after the extraction",
"paramRequired": true
}
] ]

View File

@ -129,6 +129,7 @@
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/Extracted</arg> <arg>--inputPath</arg><arg>${inputPath}/Extracted</arg>
<arg>--outputPath</arg><arg>${inputPath}/JSON</arg> <arg>--outputPath</arg><arg>${inputPath}/JSON</arg>
<arg>--backupPath</arg><arg>${inputPath}/backup</arg>
<arg>--delimiter</arg><arg>${delimiter}</arg> <arg>--delimiter</arg><arg>${delimiter}</arg>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg> <arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
</spark> </spark>

View File

@ -504,6 +504,24 @@ case object Crossref2Oaf {
) )
} }
val is_review = json \ "relation" \ "is-review-of" \ "id"
if (is_review != JNothing) {
instance.setInstancetype(
OafMapperUtils.qualifier(
"0015",
"peerReviewed",
ModelConstants.DNET_REVIEW_LEVELS,
ModelConstants.DNET_REVIEW_LEVELS
)
)
}
if (doi.startsWith("10.3410") || doi.startsWith("10.12703"))
instance.setHostedby(
OafMapperUtils.keyValue(OafMapperUtils.createOpenaireId(10, "openaire____::H1Connect", true), "H1Connect")
)
instance.setAccessright( instance.setAccessright(
decideAccessRight(instance.getLicense, result.getDateofacceptance.getValue) decideAccessRight(instance.getLicense, result.getDateofacceptance.getValue)
) )

View File

@ -18,6 +18,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.collection.ApiDescriptor; import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.utils.JsonUtils;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams; import eu.dnetlib.dhp.common.collection.HttpClientParams;
@ -49,9 +50,10 @@ public class OsfPreprintsCollectorPluginTest {
@Test @Test
@Disabled @Disabled
void test_one() throws CollectorException { void test_one() throws CollectorException {
this.plugin.collect(this.api, new AggregatorReport()) this.plugin
.limit(1) .collect(this.api, new AggregatorReport())
.forEach(log::info); .limit(1)
.forEach(log::info);
} }
@Test @Test
@ -94,7 +96,8 @@ public class OsfPreprintsCollectorPluginTest {
final HttpConnector2 connector = new HttpConnector2(); final HttpConnector2 connector = new HttpConnector2();
try { try {
final String res = connector.getInputSource("https://api.osf.io/v2/preprints/ydtzx/contributors/?format=json"); final String res = connector
.getInputSource("https://api.osf.io/v2/preprints/ydtzx/contributors/?format=json");
System.out.println(res); System.out.println(res);
fail(); fail();
} catch (final Throwable e) { } catch (final Throwable e) {
@ -110,4 +113,10 @@ public class OsfPreprintsCollectorPluginTest {
} }
@Test
void testXML() {
final String xml = JsonUtils.convertToXML("{'next':null}");
System.out.println(xml);
}
} }