package eu.dnetlib.dhp.collection.plugin.oai; import java.io.IOException; import java.io.StringReader; import java.io.StringWriter; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.Iterator; import java.util.Queue; import java.util.concurrent.PriorityBlockingQueue; import org.apache.commons.lang.StringUtils; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.DocumentHelper; import org.dom4j.Node; import org.dom4j.io.OutputFormat; import org.dom4j.io.SAXReader; import org.dom4j.io.XMLWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.aggregation.common.AggregatorReport; import eu.dnetlib.dhp.collection.CollectorException; import eu.dnetlib.dhp.collection.HttpConnector2; import eu.dnetlib.dhp.collection.XmlCleaner; public class OaiIterator implements Iterator { private static final Logger log = LoggerFactory.getLogger(OaiIterator.class); private final static String REPORT_PREFIX = "oai:"; private final Queue queue = new PriorityBlockingQueue<>(); private final String baseUrl; private final String set; private final String mdFormat; private final String fromDate; private final String untilDate; private String token; private boolean started; private final HttpConnector2 httpConnector; private final AggregatorReport report; public OaiIterator( final String baseUrl, final String mdFormat, final String set, final String fromDate, final String untilDate, final HttpConnector2 httpConnector, final AggregatorReport report) { this.baseUrl = baseUrl; this.mdFormat = mdFormat; this.set = set; this.fromDate = fromDate; this.untilDate = untilDate; this.started = false; this.httpConnector = httpConnector; this.report = report; } private void verifyStarted() { if (!this.started) { this.started = true; try { this.token = firstPage(); } catch (final CollectorException e) { throw new RuntimeException(e); } } } @Override public boolean hasNext() { synchronized (queue) { verifyStarted(); return !queue.isEmpty(); } } @Override public String next() { synchronized (queue) { verifyStarted(); final String res = queue.poll(); while (queue.isEmpty() && token != null && !token.isEmpty()) { try { token = otherPages(token); } catch (final CollectorException e) { throw new RuntimeException(e); } } return res; } } @Override public void remove() { } private String firstPage() throws CollectorException { try { String url = baseUrl + "?verb=ListRecords&metadataPrefix=" + URLEncoder.encode(mdFormat, "UTF-8"); if (set != null && !set.isEmpty()) { url += "&set=" + URLEncoder.encode(set, "UTF-8"); } if (fromDate != null && (fromDate.matches(OaiCollectorPlugin.DATE_REGEX) || fromDate.matches(OaiCollectorPlugin.UTC_DATETIME_REGEX))) { url += "&from=" + URLEncoder.encode(fromDate, "UTF-8"); } if (untilDate != null && (untilDate.matches(OaiCollectorPlugin.DATE_REGEX) || untilDate.matches(OaiCollectorPlugin.UTC_DATETIME_REGEX))) { url += "&until=" + URLEncoder.encode(untilDate, "UTF-8"); } log.info("Start harvesting using url: " + url); return downloadPage(url); } catch (final UnsupportedEncodingException e) { report.put(e.getClass().getName(), e.getMessage()); throw new CollectorException(e); } } private String extractResumptionToken(final String xml) { final String s = StringUtils.substringAfter(xml, "", "