diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpConnector2.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpConnector2.java index 78bb99e79a..d2e53f11a6 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpConnector2.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpConnector2.java @@ -212,11 +212,11 @@ public class HttpConnector2 { .format( "Unexpected status code: %s errors: %s", urlConn.getResponseCode(), MAPPER.writeValueAsString(report))); - } catch (MalformedURLException | UnknownHostException e) { + } catch (MalformedURLException e) { log.error(e.getMessage(), e); report.put(e.getClass().getName(), e.getMessage()); throw new CollectorException(e.getMessage(), e); - } catch (SocketTimeoutException | SocketException e) { + } catch (SocketTimeoutException | SocketException | UnknownHostException e) { log.error(e.getMessage(), e); report.put(e.getClass().getName(), e.getMessage()); backoffAndSleep(getClientParams().getRetryDelay() * retryNumber * 1000); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java index 479aea4589..4b0bbf145b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java @@ -49,6 +49,9 @@ public class ReadCOCI implements Serializable { final String workingPath = parser.get("inputPath"); log.info("workingPath {}", workingPath); + final String backupPath = parser.get("backupPath"); + log.info("backupPath {}", backupPath); + SparkConf sconf = new SparkConf(); Configuration conf = new Configuration(); @@ -68,12 +71,14 @@ public class ReadCOCI implements Serializable { workingPath, fileSystem, outputPath, + backupPath, delimiter); }); } private static void doRead(SparkSession spark, String workingPath, FileSystem fileSystem, String outputPath, + String backupPath, String delimiter) throws IOException { RemoteIterator fileStatusListIterator = fileSystem .listFiles( @@ -107,7 +112,8 @@ public class ReadCOCI implements Serializable { .mode(SaveMode.Append) .option("compression", "gzip") .json(outputPath); - fileSystem.rename(fileStatus.getPath(), new Path("/tmp/miriam/OC/DONE")); + + fileSystem.rename(fileStatus.getPath(), new Path(backupPath)); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java index 67e07ba594..f63bfcb48d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java @@ -46,11 +46,11 @@ public class CollectorWorker extends ReportingJob { private final HttpClientParams clientParams; public CollectorWorker( - final ApiDescriptor api, - final FileSystem fileSystem, - final MDStoreVersion mdStoreVersion, - final HttpClientParams clientParams, - final AggregatorReport report) { + final ApiDescriptor api, + final FileSystem fileSystem, + final MDStoreVersion mdStoreVersion, + final HttpClientParams clientParams, + final AggregatorReport report) { super(report); this.api = api; this.fileSystem = fileSystem; @@ -69,22 +69,25 @@ public class CollectorWorker extends ReportingJob { scheduleReport(counter); try (SequenceFile.Writer writer = SequenceFile - .createWriter(this.fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer - .keyClass(IntWritable.class), SequenceFile.Writer - .valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) { + .createWriter( + this.fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer + .keyClass(IntWritable.class), + SequenceFile.Writer + .valueClass(Text.class), + SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) { final IntWritable key = new IntWritable(counter.get()); final Text value = new Text(); plugin - .collect(this.api, this.report) - .forEach(content -> { - key.set(counter.getAndIncrement()); - value.set(content); - try { - writer.append(key, value); - } catch (final Throwable e) { - throw new RuntimeException(e); - } - }); + .collect(this.api, this.report) + .forEach(content -> { + key.set(counter.getAndIncrement()); + value.set(content); + try { + writer.append(key, value); + } catch (final Throwable e) { + throw new RuntimeException(e); + } + }); } catch (final Throwable e) { this.report.put(e.getClass().getName(), e.getMessage()); throw new CollectorException(e); @@ -112,36 +115,36 @@ public class CollectorWorker extends ReportingJob { private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException { switch (CollectorPlugin.NAME.valueOf(this.api.getProtocol())) { - case oai: - return new OaiCollectorPlugin(this.clientParams); - case rest_json2xml: - return new RestCollectorPlugin(this.clientParams); - case file: - return new FileCollectorPlugin(this.fileSystem); - case fileGzip: - return new FileGZipCollectorPlugin(this.fileSystem); - case baseDump: - return new BaseCollectorPlugin(this.fileSystem); - case gtr2Publications: - return new Gtr2PublicationsCollectorPlugin(this.clientParams); - case osfPreprints: - return new OsfPreprintsCollectorPlugin(this.clientParams); - case other: - final CollectorPlugin.NAME.OTHER_NAME plugin = Optional + case oai: + return new OaiCollectorPlugin(this.clientParams); + case rest_json2xml: + return new RestCollectorPlugin(this.clientParams); + case file: + return new FileCollectorPlugin(this.fileSystem); + case fileGzip: + return new FileGZipCollectorPlugin(this.fileSystem); + case baseDump: + return new BaseCollectorPlugin(this.fileSystem); + case gtr2Publications: + return new Gtr2PublicationsCollectorPlugin(this.clientParams); + case osfPreprints: + return new OsfPreprintsCollectorPlugin(this.clientParams); + case other: + final CollectorPlugin.NAME.OTHER_NAME plugin = Optional .ofNullable(this.api.getParams().get("other_plugin_type")) .map(CollectorPlugin.NAME.OTHER_NAME::valueOf) .orElseThrow(() -> new IllegalArgumentException("invalid other_plugin_type")); - switch (plugin) { - case mdstore_mongodb_dump: - return new MongoDbDumpCollectorPlugin(this.fileSystem); - case mdstore_mongodb: - return new MDStoreCollectorPlugin(); + switch (plugin) { + case mdstore_mongodb_dump: + return new MongoDbDumpCollectorPlugin(this.fileSystem); + case mdstore_mongodb: + return new MDStoreCollectorPlugin(); + default: + throw new UnknownCollectorPluginException("plugin is not managed: " + plugin); + } default: - throw new UnknownCollectorPluginException("plugin is not managed: " + plugin); - } - default: - throw new UnknownCollectorPluginException("protocol is not managed: " + this.api.getProtocol()); + throw new UnknownCollectorPluginException("protocol is not managed: " + this.api.getProtocol()); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/osf/OsfPreprintsCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/osf/OsfPreprintsCollectorPlugin.java index fdc9df06f9..b0787eb450 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/osf/OsfPreprintsCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/osf/OsfPreprintsCollectorPlugin.java @@ -31,17 +31,19 @@ public class OsfPreprintsCollectorPlugin implements CollectorPlugin { final String baseUrl = api.getBaseUrl(); final int pageSize = Optional - .ofNullable(api.getParams().get("pageSize")) - .filter(StringUtils::isNotBlank) - .map(s -> NumberUtils.toInt(s, PAGE_SIZE_VALUE_DEFAULT)) - .orElse(PAGE_SIZE_VALUE_DEFAULT); + .ofNullable(api.getParams().get("pageSize")) + .filter(StringUtils::isNotBlank) + .map(s -> NumberUtils.toInt(s, PAGE_SIZE_VALUE_DEFAULT)) + .orElse(PAGE_SIZE_VALUE_DEFAULT); - if (StringUtils.isBlank(baseUrl)) { throw new CollectorException("Param 'baseUrl' is null or empty"); } + if (StringUtils.isBlank(baseUrl)) { + throw new CollectorException("Param 'baseUrl' is null or empty"); + } final OsfPreprintsIterator it = new OsfPreprintsIterator(baseUrl, pageSize, getClientParams()); return StreamSupport - .stream(Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), false); + .stream(Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), false); } public HttpClientParams getClientParams() { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/osf/OsfPreprintsIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/osf/OsfPreprintsIterator.java index 9484297d0a..76adba1a8a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/osf/OsfPreprintsIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/osf/OsfPreprintsIterator.java @@ -5,6 +5,7 @@ import java.util.Iterator; import java.util.Queue; import java.util.concurrent.PriorityBlockingQueue; +import org.apache.commons.lang3.StringUtils; import org.dom4j.Document; import org.dom4j.DocumentHelper; import org.dom4j.Element; @@ -33,9 +34,9 @@ public class OsfPreprintsIterator implements Iterator { private final Queue recordQueue = new PriorityBlockingQueue<>(); public OsfPreprintsIterator( - final String baseUrl, - final int pageSize, - final HttpClientParams clientParams) { + final String baseUrl, + final int pageSize, + final HttpClientParams clientParams) { this.clientParams = clientParams; this.baseUrl = baseUrl; @@ -46,13 +47,15 @@ public class OsfPreprintsIterator implements Iterator { private void initQueue() { this.currentUrl = this.baseUrl + "?filter:is_published:d=true&format=json&page[size]=" + this.pageSize; + log.info("REST calls starting with {}", this.currentUrl); } @Override public boolean hasNext() { synchronized (this.recordQueue) { - while (this.recordQueue.isEmpty() && !this.currentUrl.isEmpty()) { + while (this.recordQueue.isEmpty() && StringUtils.isNotBlank(this.currentUrl) + && this.currentUrl.startsWith("http")) { try { this.currentUrl = downloadPage(this.currentUrl); } catch (final CollectorException e) { @@ -61,7 +64,9 @@ public class OsfPreprintsIterator implements Iterator { } } - if (!this.recordQueue.isEmpty()) { return true; } + if (!this.recordQueue.isEmpty()) { + return true; + } return false; } @@ -83,17 +88,23 @@ public class OsfPreprintsIterator implements Iterator { final Element n = (Element) ((Element) o).detach(); final Element group = DocumentHelper.createElement("group"); - group.addAttribute("id", n.valueOf(".//data/id")); + group.addAttribute("id", n.valueOf("./id")); group.addElement("preprint").add(n); for (final Object o1 : n.selectNodes(".//contributors//href")) { - final Document doc1 = downloadUrl(((Node) o1).getText(), 0); - group.addElement("contributors").add(doc1.getRootElement().detach()); + final String href = ((Node) o1).getText(); + if (StringUtils.isNotBlank(href) && href.startsWith("http")) { + final Document doc1 = downloadUrl(href, 0); + group.addElement("contributors").add(doc1.getRootElement().detach()); + } } for (final Object o1 : n.selectNodes(".//primary_file//href")) { - final Document doc1 = downloadUrl(((Node) o1).getText(), 0); - group.addElement("primary_file").add(doc1.getRootElement().detach()); + final String href = ((Node) o1).getText(); + if (StringUtils.isNotBlank(href) && href.startsWith("http")) { + final Document doc1 = downloadUrl(href, 0); + group.addElement("primary_file").add(doc1.getRootElement().detach()); + } } this.recordQueue.add(DocumentHelper.createDocument(group).asXML()); @@ -104,7 +115,9 @@ public class OsfPreprintsIterator implements Iterator { } private Document downloadUrl(final String url, final int attempt) throws CollectorException { - if (attempt > MAX_ATTEMPTS) { throw new CollectorException("Max Number of attempts reached, url:" + url); } + if (attempt > MAX_ATTEMPTS) { + throw new CollectorException("Max Number of attempts reached, url:" + url); + } if (attempt > 0) { final int delay = (attempt * 5000); diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json index a74ceb983e..d1f495d678 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json @@ -24,12 +24,19 @@ "paramLongName": "outputPath", "paramDescription": "the hdfs name node", "paramRequired": true - }, { - "paramName": "nn", - "paramLongName": "hdfsNameNode", - "paramDescription": "the hdfs name node", - "paramRequired": true -} + }, + { + "paramName": "nn", + "paramLongName": "hdfsNameNode", + "paramDescription": "the hdfs name node", + "paramRequired": true + }, + { + "paramName": "bp", + "paramLongName": "backupPath", + "paramDescription": "the hdfs path to move the OC data after the extraction", + "paramRequired": true + } ] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml index 566cf7d028..f170af96fe 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml @@ -129,6 +129,7 @@ --inputPath${inputPath}/Extracted --outputPath${inputPath}/JSON + --backupPath${inputPath}/backup --delimiter${delimiter} --hdfsNameNode${nameNode} diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala index ebe72ae5b3..4bd6bcc098 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/Crossref2Oaf.scala @@ -504,6 +504,24 @@ case object Crossref2Oaf { ) } + val is_review = json \ "relation" \ "is-review-of" \ "id" + + if (is_review != JNothing) { + instance.setInstancetype( + OafMapperUtils.qualifier( + "0015", + "peerReviewed", + ModelConstants.DNET_REVIEW_LEVELS, + ModelConstants.DNET_REVIEW_LEVELS + ) + ) + } + + if (doi.startsWith("10.3410") || doi.startsWith("10.12703")) + instance.setHostedby( + OafMapperUtils.keyValue(OafMapperUtils.createOpenaireId(10, "openaire____::H1Connect", true), "H1Connect") + ) + instance.setAccessright( decideAccessRight(instance.getLicense, result.getDateofacceptance.getValue) ) diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/osf/OsfPreprintsCollectorPluginTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/osf/OsfPreprintsCollectorPluginTest.java index efba0c72e5..664b84d5aa 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/osf/OsfPreprintsCollectorPluginTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/osf/OsfPreprintsCollectorPluginTest.java @@ -18,6 +18,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.collection.plugin.utils.JsonUtils; import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.common.collection.HttpClientParams; @@ -49,9 +50,10 @@ public class OsfPreprintsCollectorPluginTest { @Test @Disabled void test_one() throws CollectorException { - this.plugin.collect(this.api, new AggregatorReport()) - .limit(1) - .forEach(log::info); + this.plugin + .collect(this.api, new AggregatorReport()) + .limit(1) + .forEach(log::info); } @Test @@ -94,7 +96,8 @@ public class OsfPreprintsCollectorPluginTest { final HttpConnector2 connector = new HttpConnector2(); try { - final String res = connector.getInputSource("https://api.osf.io/v2/preprints/ydtzx/contributors/?format=json"); + final String res = connector + .getInputSource("https://api.osf.io/v2/preprints/ydtzx/contributors/?format=json"); System.out.println(res); fail(); } catch (final Throwable e) { @@ -110,4 +113,10 @@ public class OsfPreprintsCollectorPluginTest { } + @Test + void testXML() { + final String xml = JsonUtils.convertToXML("{'next':null}"); + System.out.println(xml); + } + }