forked from D-Net/dnet-hadoop
partial implementation of osfPreprints plugin + tests
This commit is contained in:
parent
dcf09811a2
commit
9073b1159d
|
@ -3,8 +3,6 @@ package eu.dnetlib.dhp.collection.plugin.osf;
|
|||
|
||||
import java.io.InputStream;
|
||||
import java.io.StringWriter;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Iterator;
|
||||
import java.util.Queue;
|
||||
|
@ -31,18 +29,18 @@ import org.xml.sax.InputSource;
|
|||
import eu.dnetlib.dhp.collection.plugin.utils.JsonUtils;
|
||||
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||
import eu.dnetlib.dhp.common.collection.HttpClientParams;
|
||||
import eu.dnetlib.dhp.common.collection.HttpConnector2;
|
||||
|
||||
public class OsfPreprintsIterator implements Iterator<String> {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(OsfPreprintsIterator.class);
|
||||
public static final String UTF_8 = "UTF-8";
|
||||
|
||||
private static final int MAX_ATTEMPTS = 5;
|
||||
|
||||
private final HttpClientParams clientParams;
|
||||
|
||||
private static final String XML_HEADER = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
|
||||
private static final String EMPTY_XML = XML_HEADER + "<" + JsonUtils.XML_WRAP_TAG + "></" + JsonUtils.XML_WRAP_TAG
|
||||
+ ">";
|
||||
private static final String EMPTY_XML = XML_HEADER + "<" + JsonUtils.XML_WRAP_TAG + "></" + JsonUtils.XML_WRAP_TAG + ">";
|
||||
|
||||
private final String baseUrl;
|
||||
private final int pageSize;
|
||||
|
@ -91,10 +89,6 @@ public class OsfPreprintsIterator implements Iterator<String> {
|
|||
log.info("REST calls starting with {}", this.query);
|
||||
}
|
||||
|
||||
private void disconnect() {
|
||||
// TODO close inputstream
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
|
@ -114,7 +108,6 @@ public class OsfPreprintsIterator implements Iterator<String> {
|
|||
|
||||
if (!this.recordQueue.isEmpty()) { return true; }
|
||||
|
||||
disconnect();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -158,15 +151,12 @@ public class OsfPreprintsIterator implements Iterator<String> {
|
|||
try {
|
||||
log.info("requesting URL [{}]", query);
|
||||
|
||||
final URL qUrl = new URL(query);
|
||||
final HttpConnector2 connector = new HttpConnector2(this.clientParams);
|
||||
|
||||
final HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
|
||||
conn.setRequestMethod("GET");
|
||||
this.resultStream = conn.getInputStream();
|
||||
|
||||
resultJson = IOUtils.toString(this.resultStream, StandardCharsets.UTF_8);
|
||||
resultJson = connector.getInputSource(query);
|
||||
resultXml = JsonUtils.convertToXML(resultJson);
|
||||
this.resultStream = IOUtils.toInputStream(resultXml, UTF_8);
|
||||
|
||||
this.resultStream = IOUtils.toInputStream(resultXml, StandardCharsets.UTF_8);
|
||||
|
||||
if (!isEmptyXml(resultXml)) {
|
||||
resultNode = (Node) this.xpath
|
||||
|
@ -178,8 +168,7 @@ public class OsfPreprintsIterator implements Iterator<String> {
|
|||
this.transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
|
||||
final String toEnqueue = sw.toString();
|
||||
if ((toEnqueue == null) || StringUtils.isBlank(toEnqueue) || isEmptyXml(toEnqueue)) {
|
||||
log
|
||||
.warn("The following record resulted in empty item for the feeding queue: {}", resultXml);
|
||||
log.warn("The following record resulted in empty item for the feeding queue: {}", resultXml);
|
||||
} else {
|
||||
this.recordQueue.add(sw.toString());
|
||||
}
|
||||
|
@ -213,8 +202,6 @@ public class OsfPreprintsIterator implements Iterator<String> {
|
|||
+ this.resumptionStr;
|
||||
} else {
|
||||
nextQuery = "";
|
||||
// if (resumptionType.toLowerCase().equals("deep-cursor")) { resumptionInt -= 1; } // correct the
|
||||
// resumptionInt and prevent a NullPointer Exception at mdStore
|
||||
}
|
||||
log.debug("nextQueryUrl: " + nextQuery);
|
||||
return nextQuery;
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
package eu.dnetlib.dhp.collection.plugin.rest;
|
||||
package eu.dnetlib.dhp.collection.plugin.osf;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -18,9 +18,9 @@ import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
|||
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||
import eu.dnetlib.dhp.common.collection.HttpClientParams;
|
||||
|
||||
public class OsfPreprintCollectorTest {
|
||||
public class OsfPreprintsCollectorPluginTest {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(OsfPreprintCollectorTest.class);
|
||||
private static final Logger log = LoggerFactory.getLogger(OsfPreprintsCollectorPlugin.class);
|
||||
|
||||
private final String baseUrl = "https://api.osf.io/v2/preprints/";
|
||||
|
||||
|
@ -29,50 +29,28 @@ public class OsfPreprintCollectorTest {
|
|||
// private final String authToken = "";
|
||||
// private final String resultOutputFormat = "";
|
||||
|
||||
private final String queryParams = "filter:is_published:d=true";
|
||||
|
||||
private final String entityXpath = "/*/*[local-name()='data']";
|
||||
|
||||
private final String resultTotalXpath = "/*/*[local-name()='links']/*[local-name()='meta']/*[local-name()='total']";
|
||||
|
||||
private final String resumptionParam = "page";
|
||||
private final String resumptionType = "scan";
|
||||
private final String resumptionXpath = "substring-before(substring-after(/*/*[local-name()='links']/*[local-name()='next'], 'page='), '&')";
|
||||
|
||||
private final String resultSizeParam = "page[size]";
|
||||
private final String resultSizeValue = "100";
|
||||
|
||||
private final String resultFormatParam = "format";
|
||||
private final String resultFormatValue = "json";
|
||||
private final int pageSize = 100;
|
||||
|
||||
private final ApiDescriptor api = new ApiDescriptor();
|
||||
private RestCollectorPlugin rcp;
|
||||
|
||||
private OsfPreprintsCollectorPlugin plugin;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
final HashMap<String, String> params = new HashMap<>();
|
||||
params.put("resumptionType", this.resumptionType);
|
||||
params.put("resumptionParam", this.resumptionParam);
|
||||
params.put("resumptionXpath", this.resumptionXpath);
|
||||
params.put("resultTotalXpath", this.resultTotalXpath);
|
||||
params.put("resultFormatParam", this.resultFormatParam);
|
||||
params.put("resultFormatValue", this.resultFormatValue);
|
||||
params.put("resultSizeParam", this.resultSizeParam);
|
||||
params.put("resultSizeValue", this.resultSizeValue);
|
||||
params.put("queryParams", this.queryParams);
|
||||
params.put("entityXpath", this.entityXpath);
|
||||
params.put("pageSize", "" + this.pageSize);
|
||||
|
||||
this.api.setBaseUrl(this.baseUrl);
|
||||
this.api.setParams(params);
|
||||
|
||||
this.rcp = new RestCollectorPlugin(new HttpClientParams());
|
||||
this.plugin = new OsfPreprintsCollectorPlugin(new HttpClientParams());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled
|
||||
void test_limited() throws CollectorException {
|
||||
final AtomicInteger i = new AtomicInteger(0);
|
||||
final Stream<String> stream = this.rcp.collect(this.api, new AggregatorReport());
|
||||
final Stream<String> stream = this.plugin.collect(this.api, new AggregatorReport());
|
||||
|
||||
stream.limit(2000).forEach(s -> {
|
||||
Assertions.assertTrue(s.length() > 0);
|
||||
|
@ -88,7 +66,7 @@ public class OsfPreprintCollectorTest {
|
|||
@Disabled
|
||||
void test_all() throws CollectorException {
|
||||
final AtomicLong i = new AtomicLong(0);
|
||||
final Stream<String> stream = this.rcp.collect(this.api, new AggregatorReport());
|
||||
final Stream<String> stream = this.plugin.collect(this.api, new AggregatorReport());
|
||||
|
||||
stream.forEach(s -> {
|
||||
Assertions.assertTrue(s.length() > 0);
|
Loading…
Reference in New Issue