1
0
Fork 0

Merge pull request 'osfPreprints_plugin' (#480) from osfPreprints_plugin into beta

Reviewed-on: D-Net/dnet-hadoop#480
This commit is contained in:
Michele Artini 2024-09-20 09:56:50 +02:00
commit db6f137cf9
9 changed files with 336 additions and 160 deletions

View File

@ -26,6 +26,7 @@ import eu.dnetlib.dhp.collection.plugin.gtr2.Gtr2PublicationsCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.mongodb.MDStoreCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.mongodb.MDStoreCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.osf.OsfPreprintsCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.rest.RestCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.rest.RestCollectorPlugin;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.common.collection.CollectorException;
@ -68,12 +69,9 @@ public class CollectorWorker extends ReportingJob {
scheduleReport(counter); scheduleReport(counter);
try (SequenceFile.Writer writer = SequenceFile try (SequenceFile.Writer writer = SequenceFile
.createWriter( .createWriter(this.fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
this.fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer .keyClass(IntWritable.class), SequenceFile.Writer
.keyClass(IntWritable.class), .valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
SequenceFile.Writer
.valueClass(Text.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
final IntWritable key = new IntWritable(counter.get()); final IntWritable key = new IntWritable(counter.get());
final Text value = new Text(); final Text value = new Text();
plugin plugin
@ -126,6 +124,8 @@ public class CollectorWorker extends ReportingJob {
return new BaseCollectorPlugin(this.fileSystem); return new BaseCollectorPlugin(this.fileSystem);
case gtr2Publications: case gtr2Publications:
return new Gtr2PublicationsCollectorPlugin(this.clientParams); return new Gtr2PublicationsCollectorPlugin(this.clientParams);
case osfPreprints:
return new OsfPreprintsCollectorPlugin(this.clientParams);
case other: case other:
final CollectorPlugin.NAME.OTHER_NAME plugin = Optional final CollectorPlugin.NAME.OTHER_NAME plugin = Optional
.ofNullable(this.api.getParams().get("other_plugin_type")) .ofNullable(this.api.getParams().get("other_plugin_type"))

View File

@ -11,7 +11,7 @@ public interface CollectorPlugin {
enum NAME { enum NAME {
oai, other, rest_json2xml, file, fileGzip, baseDump, gtr2Publications; oai, other, rest_json2xml, file, fileGzip, baseDump, gtr2Publications, osfPreprints;
public enum OTHER_NAME { public enum OTHER_NAME {
mdstore_mongodb_dump, mdstore_mongodb mdstore_mongodb_dump, mdstore_mongodb

View File

@ -29,9 +29,12 @@ public class Gtr2PublicationsCollectorPlugin implements CollectorPlugin {
final String endPage = api.getParams().get("endPage"); final String endPage = api.getParams().get("endPage");
final String fromDate = api.getParams().get("fromDate"); final String fromDate = api.getParams().get("fromDate");
if ((fromDate != null) && !fromDate.matches("\\d{4}-\\d{2}-\\d{2}")) { throw new CollectorException("Invalid date (YYYY-MM-DD): " + fromDate); } if ((fromDate != null) && !fromDate.matches("\\d{4}-\\d{2}-\\d{2}")) {
throw new CollectorException("Invalid date (YYYY-MM-DD): " + fromDate);
}
final Iterator<String> iterator = new Gtr2PublicationsIterator(baseUrl, fromDate, startPage, endPage, this.clientParams); final Iterator<String> iterator = new Gtr2PublicationsIterator(baseUrl, fromDate, startPage, endPage,
this.clientParams);
final Spliterator<String> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED); final Spliterator<String> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
return StreamSupport.stream(spliterator, false); return StreamSupport.stream(spliterator, false);

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.collection.plugin.gtr2; package eu.dnetlib.dhp.collection.plugin.gtr2;
import java.util.ArrayList; import java.util.ArrayList;
@ -48,7 +49,8 @@ public class Gtr2PublicationsIterator implements Iterator<String> {
private String nextElement; private String nextElement;
public Gtr2PublicationsIterator(final String baseUrl, final String fromDate, final String startPage, final String endPage, public Gtr2PublicationsIterator(final String baseUrl, final String fromDate, final String startPage,
final String endPage,
final HttpClientParams clientParams) final HttpClientParams clientParams)
throws CollectorException { throws CollectorException {
@ -120,7 +122,8 @@ public class Gtr2PublicationsIterator implements Iterator<String> {
return res; return res;
} }
private void addLinkedEntities(final Element master, final String relType, final Element newRoot, final Function<Document, Element> mapper) { private void addLinkedEntities(final Element master, final String relType, final Element newRoot,
final Function<Document, Element> mapper) {
for (final Object o : master.selectNodes(".//*[local-name()='link']")) { for (final Object o : master.selectNodes(".//*[local-name()='link']")) {
final String rel = ((Element) o).valueOf("@*[local-name()='rel']"); final String rel = ((Element) o).valueOf("@*[local-name()='rel']");
@ -165,7 +168,9 @@ public class Gtr2PublicationsIterator implements Iterator<String> {
private Element asProjectElement(final Document doc) { private Element asProjectElement(final Document doc) {
final Element newOrg = DocumentHelper.createElement("project"); final Element newOrg = DocumentHelper.createElement("project");
newOrg.addElement("id").setText(doc.valueOf("/*/@*[local-name()='id']")); newOrg.addElement("id").setText(doc.valueOf("/*/@*[local-name()='id']"));
newOrg.addElement("code").setText(doc.valueOf("//*[local-name()='identifier' and @*[local-name()='type'] = 'RCUK']")); newOrg
.addElement("code")
.setText(doc.valueOf("//*[local-name()='identifier' and @*[local-name()='type'] = 'RCUK']"));
newOrg.addElement("title").setText(doc.valueOf("//*[local-name()='title']")); newOrg.addElement("title").setText(doc.valueOf("//*[local-name()='title']"));
return newOrg; return newOrg;
} }
@ -188,7 +193,9 @@ public class Gtr2PublicationsIterator implements Iterator<String> {
return DocumentHelper.parseText(new String(bytes)); return DocumentHelper.parseText(new String(bytes));
} catch (final Throwable e) { } catch (final Throwable e) {
log.error("Error dowloading url: " + cleanUrl + ", attempt = " + attempt, e); log.error("Error dowloading url: " + cleanUrl + ", attempt = " + attempt, e);
if (attempt >= MAX_ATTEMPTS) { throw new RuntimeException("Error dowloading url: " + cleanUrl, e); } if (attempt >= MAX_ATTEMPTS) {
throw new RuntimeException("Error dowloading url: " + cleanUrl, e);
}
try { try {
Thread.sleep(60000); // I wait for a minute Thread.sleep(60000); // I wait for a minute
} catch (final InterruptedException e1) { } catch (final InterruptedException e1) {

View File

@ -0,0 +1,50 @@
package eu.dnetlib.dhp.collection.plugin.osf;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
public class OsfPreprintsCollectorPlugin implements CollectorPlugin {
public static final int PAGE_SIZE_VALUE_DEFAULT = 100;
private final HttpClientParams clientParams;
public OsfPreprintsCollectorPlugin(final HttpClientParams clientParams) {
this.clientParams = clientParams;
}
@Override
public Stream<String> collect(final ApiDescriptor api, final AggregatorReport report) throws CollectorException {
final String baseUrl = api.getBaseUrl();
final int pageSize = Optional
.ofNullable(api.getParams().get("pageSize"))
.filter(StringUtils::isNotBlank)
.map(s -> NumberUtils.toInt(s, PAGE_SIZE_VALUE_DEFAULT))
.orElse(PAGE_SIZE_VALUE_DEFAULT);
if (StringUtils.isBlank(baseUrl)) { throw new CollectorException("Param 'baseUrl' is null or empty"); }
final OsfPreprintsIterator it = new OsfPreprintsIterator(baseUrl, pageSize, getClientParams());
return StreamSupport
.stream(Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), false);
}
public HttpClientParams getClientParams() {
return this.clientParams;
}
}

View File

@ -0,0 +1,133 @@
package eu.dnetlib.dhp.collection.plugin.osf;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.dom4j.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.collection.plugin.utils.JsonUtils;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
import eu.dnetlib.dhp.common.collection.HttpConnector2;
public class OsfPreprintsIterator implements Iterator<String> {
private static final Logger log = LoggerFactory.getLogger(OsfPreprintsIterator.class);
private static final int MAX_ATTEMPTS = 5;
private final HttpClientParams clientParams;
private final String baseUrl;
private final int pageSize;
private String currentUrl;
private final Queue<String> recordQueue = new PriorityBlockingQueue<>();
public OsfPreprintsIterator(
final String baseUrl,
final int pageSize,
final HttpClientParams clientParams) {
this.clientParams = clientParams;
this.baseUrl = baseUrl;
this.pageSize = pageSize;
initQueue();
}
private void initQueue() {
this.currentUrl = this.baseUrl + "?filter:is_published:d=true&format=json&page[size]=" + this.pageSize;
log.info("REST calls starting with {}", this.currentUrl);
}
@Override
public boolean hasNext() {
synchronized (this.recordQueue) {
while (this.recordQueue.isEmpty() && !this.currentUrl.isEmpty()) {
try {
this.currentUrl = downloadPage(this.currentUrl);
} catch (final CollectorException e) {
log.debug("CollectorPlugin.next()-Exception: {}", e);
throw new RuntimeException(e);
}
}
if (!this.recordQueue.isEmpty()) { return true; }
return false;
}
}
@Override
public String next() {
synchronized (this.recordQueue) {
return this.recordQueue.poll();
}
}
private String downloadPage(final String url) throws CollectorException {
final Document doc = downloadUrl(url, 0);
for (final Object o : doc.selectNodes("/*/data")) {
final Element n = (Element) ((Element) o).detach();
final Element group = DocumentHelper.createElement("group");
group.addAttribute("id", n.valueOf(".//data/id"));
group.addElement("preprint").add(n);
for (final Object o1 : n.selectNodes(".//contributors//href")) {
final Document doc1 = downloadUrl(((Node) o1).getText(), 0);
group.addElement("contributors").add(doc1.getRootElement().detach());
}
for (final Object o1 : n.selectNodes(".//primary_file//href")) {
final Document doc1 = downloadUrl(((Node) o1).getText(), 0);
group.addElement("primary_file").add(doc1.getRootElement().detach());
}
this.recordQueue.add(DocumentHelper.createDocument(group).asXML());
}
return doc.valueOf("/*/links/next");
}
private Document downloadUrl(final String url, final int attempt) throws CollectorException {
if (attempt > MAX_ATTEMPTS) { throw new CollectorException("Max Number of attempts reached, url:" + url); }
if (attempt > 0) {
final int delay = (attempt * 5000);
log.debug("Attempt {} with delay {}", attempt, delay);
try {
Thread.sleep(delay);
} catch (final InterruptedException e) {
new CollectorException(e);
}
}
try {
log.info("requesting URL [{}]", url);
final HttpConnector2 connector = new HttpConnector2(this.clientParams);
final String json = connector.getInputSource(url);
final String xml = JsonUtils.convertToXML(json);
return DocumentHelper.parseText(xml);
} catch (final Throwable e) {
log.warn(e.getMessage(), e);
return downloadUrl(url, attempt + 1);
}
}
}

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.collection.plugin.gtr2; package eu.dnetlib.dhp.collection.plugin.gtr2;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -55,7 +56,8 @@ class Gtr2PublicationsIteratorTest {
@Disabled @Disabled
public void testIncrementalHarvestingNoRecords() throws Exception { public void testIncrementalHarvestingNoRecords() throws Exception {
System.out.println("incremental Harvesting"); System.out.println("incremental Harvesting");
final Iterator<String> iterator = new Gtr2PublicationsIterator(baseURL, "2050-12-12T", "11", "13", clientParams); final Iterator<String> iterator = new Gtr2PublicationsIterator(baseURL, "2050-12-12T", "11", "13",
clientParams);
final int count = iterateAndCount(iterator); final int count = iterateAndCount(iterator);
assertEquals(0, count); assertEquals(0, count);
} }

View File

@ -0,0 +1,86 @@
package eu.dnetlib.dhp.collection.plugin.osf;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
public class OsfPreprintsCollectorPluginTest {
private static final Logger log = LoggerFactory.getLogger(OsfPreprintsCollectorPlugin.class);
private final String baseUrl = "https://api.osf.io/v2/preprints/";
private final int pageSize = 100;
private final ApiDescriptor api = new ApiDescriptor();
private OsfPreprintsCollectorPlugin plugin;
@BeforeEach
public void setUp() {
final HashMap<String, String> params = new HashMap<>();
params.put("pageSize", "" + this.pageSize);
this.api.setBaseUrl(this.baseUrl);
this.api.setParams(params);
this.plugin = new OsfPreprintsCollectorPlugin(new HttpClientParams());
}
@Test
@Disabled
void test_one() throws CollectorException {
this.plugin.collect(this.api, new AggregatorReport())
.limit(1)
.forEach(log::info);
}
@Test
// @Disabled
void test_limited() throws CollectorException {
final AtomicInteger i = new AtomicInteger(0);
final Stream<String> stream = this.plugin.collect(this.api, new AggregatorReport());
stream.limit(2000).forEach(s -> {
Assertions.assertTrue(s.length() > 0);
i.incrementAndGet();
log.info(s);
});
log.info("{}", i.intValue());
Assertions.assertTrue(i.intValue() > 0);
}
@Test
@Disabled
void test_all() throws CollectorException {
final AtomicLong i = new AtomicLong(0);
final Stream<String> stream = this.plugin.collect(this.api, new AggregatorReport());
stream.forEach(s -> {
Assertions.assertTrue(s.length() > 0);
if ((i.incrementAndGet() % 1000) == 0) {
log.info("COLLECTED: {}", i.get());
}
});
log.info("TOTAL: {}", i.get());
Assertions.assertTrue(i.get() > 0);
}
}

View File

@ -1,105 +0,0 @@
package eu.dnetlib.dhp.collection.plugin.rest;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
public class OsfPreprintCollectorTest {
private static final Logger log = LoggerFactory.getLogger(OsfPreprintCollectorTest.class);
private final String baseUrl = "https://api.osf.io/v2/preprints/";
// private final String requestHeaderMap = "";
// private final String authMethod = "";
// private final String authToken = "";
// private final String resultOutputFormat = "";
private final String queryParams = "filter:is_published:d=true";
private final String entityXpath = "/*/*[local-name()='data']";
private final String resultTotalXpath = "/*/*[local-name()='links']/*[local-name()='meta']/*[local-name()='total']";
private final String resumptionParam = "page";
private final String resumptionType = "scan";
private final String resumptionXpath = "substring-before(substring-after(/*/*[local-name()='links']/*[local-name()='next'], 'page='), '&')";
private final String resultSizeParam = "page[size]";
private final String resultSizeValue = "100";
private final String resultFormatParam = "format";
private final String resultFormatValue = "json";
private final ApiDescriptor api = new ApiDescriptor();
private RestCollectorPlugin rcp;
@BeforeEach
public void setUp() {
final HashMap<String, String> params = new HashMap<>();
params.put("resumptionType", this.resumptionType);
params.put("resumptionParam", this.resumptionParam);
params.put("resumptionXpath", this.resumptionXpath);
params.put("resultTotalXpath", this.resultTotalXpath);
params.put("resultFormatParam", this.resultFormatParam);
params.put("resultFormatValue", this.resultFormatValue);
params.put("resultSizeParam", this.resultSizeParam);
params.put("resultSizeValue", this.resultSizeValue);
params.put("queryParams", this.queryParams);
params.put("entityXpath", this.entityXpath);
this.api.setBaseUrl(this.baseUrl);
this.api.setParams(params);
this.rcp = new RestCollectorPlugin(new HttpClientParams());
}
@Test
@Disabled
void test_limited() throws CollectorException {
final AtomicInteger i = new AtomicInteger(0);
final Stream<String> stream = this.rcp.collect(this.api, new AggregatorReport());
stream.limit(2000).forEach(s -> {
Assertions.assertTrue(s.length() > 0);
i.incrementAndGet();
log.info(s);
});
log.info("{}", i.intValue());
Assertions.assertTrue(i.intValue() > 0);
}
@Test
@Disabled
void test_all() throws CollectorException {
final AtomicLong i = new AtomicLong(0);
final Stream<String> stream = this.rcp.collect(this.api, new AggregatorReport());
stream.forEach(s -> {
Assertions.assertTrue(s.length() > 0);
if ((i.incrementAndGet() % 1000) == 0) {
log.info("COLLECTED: {}", i.get());
}
});
log.info("TOTAL: {}", i.get());
Assertions.assertTrue(i.get() > 0);
}
}