diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java index 98caa17416..5021b7727e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java @@ -22,6 +22,7 @@ import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; import eu.dnetlib.dhp.collection.plugin.base.BaseCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.file.FileCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.file.FileGZipCollectorPlugin; +import eu.dnetlib.dhp.collection.plugin.gtr2.Gtr2PublicationsCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.mongodb.MDStoreCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin; @@ -58,7 +59,7 @@ public class CollectorWorker extends ReportingJob { public void collect() throws UnknownCollectorPluginException, CollectorException, IOException { - final String outputPath = mdStoreVersion.getHdfsPath() + SEQUENCE_FILE_NAME; + final String outputPath = this.mdStoreVersion.getHdfsPath() + SEQUENCE_FILE_NAME; log.info("outputPath path is {}", outputPath); final CollectorPlugin plugin = getCollectorPlugin(); @@ -68,36 +69,36 @@ public class CollectorWorker extends ReportingJob { try (SequenceFile.Writer writer = SequenceFile .createWriter( - fileSystem.getConf(), - SequenceFile.Writer.file(new Path(outputPath)), - SequenceFile.Writer.keyClass(IntWritable.class), - SequenceFile.Writer.valueClass(Text.class), + this.fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer + .keyClass(IntWritable.class), + SequenceFile.Writer + .valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) { final IntWritable key = new IntWritable(counter.get()); final Text value = new Text(); plugin - .collect(api, report) - .forEach( - content -> { - key.set(counter.getAndIncrement()); - value.set(content); - try { - writer.append(key, value); - } catch (Throwable e) { - throw new RuntimeException(e); - } - }); - } catch (Throwable e) { - report.put(e.getClass().getName(), e.getMessage()); + .collect(this.api, this.report) + .forEach(content -> { + key.set(counter.getAndIncrement()); + value.set(content); + try { + writer.append(key, value); + } catch (final Throwable e) { + throw new RuntimeException(e); + } + }); + } catch (final Throwable e) { + this.report.put(e.getClass().getName(), e.getMessage()); throw new CollectorException(e); } finally { shutdown(); - report.ongoing(counter.longValue(), counter.longValue()); + this.report.ongoing(counter.longValue(), counter.longValue()); } } - private void scheduleReport(AtomicInteger counter) { + private void scheduleReport(final AtomicInteger counter) { schedule(new ReporterCallback() { + @Override public Long getCurrent() { return counter.longValue(); @@ -112,33 +113,35 @@ public class CollectorWorker extends ReportingJob { private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException { - switch (CollectorPlugin.NAME.valueOf(api.getProtocol())) { + switch (CollectorPlugin.NAME.valueOf(this.api.getProtocol())) { case oai: - return new OaiCollectorPlugin(clientParams); + return new OaiCollectorPlugin(this.clientParams); case rest_json2xml: - return new RestCollectorPlugin(clientParams); + return new RestCollectorPlugin(this.clientParams); case file: - return new FileCollectorPlugin(fileSystem); + return new FileCollectorPlugin(this.fileSystem); case fileGzip: - return new FileGZipCollectorPlugin(fileSystem); + return new FileGZipCollectorPlugin(this.fileSystem); case baseDump: return new BaseCollectorPlugin(this.fileSystem); + case gtr2Publications: + return new Gtr2PublicationsCollectorPlugin(this.clientParams); case other: final CollectorPlugin.NAME.OTHER_NAME plugin = Optional - .ofNullable(api.getParams().get("other_plugin_type")) + .ofNullable(this.api.getParams().get("other_plugin_type")) .map(CollectorPlugin.NAME.OTHER_NAME::valueOf) .orElseThrow(() -> new IllegalArgumentException("invalid other_plugin_type")); switch (plugin) { case mdstore_mongodb_dump: - return new MongoDbDumpCollectorPlugin(fileSystem); + return new MongoDbDumpCollectorPlugin(this.fileSystem); case mdstore_mongodb: return new MDStoreCollectorPlugin(); default: throw new UnknownCollectorPluginException("plugin is not managed: " + plugin); } default: - throw new UnknownCollectorPluginException("protocol is not managed: " + api.getProtocol()); + throw new UnknownCollectorPluginException("protocol is not managed: " + this.api.getProtocol()); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java index 97d2d25854..0bba8d7641 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java @@ -11,7 +11,7 @@ public interface CollectorPlugin { enum NAME { - oai, other, rest_json2xml, file, fileGzip, baseDump; + oai, other, rest_json2xml, file, fileGzip, baseDump, gtr2Publications; public enum OTHER_NAME { mdstore_mongodb_dump, mdstore_mongodb diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/gtr2/Gtr2PublicationsCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/gtr2/Gtr2PublicationsCollectorPlugin.java new file mode 100644 index 0000000000..543bcbe5a8 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/gtr2/Gtr2PublicationsCollectorPlugin.java @@ -0,0 +1,40 @@ + +package eu.dnetlib.dhp.collection.plugin.gtr2; + +import java.util.Iterator; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import eu.dnetlib.dhp.collection.ApiDescriptor; +import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; +import eu.dnetlib.dhp.common.collection.CollectorException; +import eu.dnetlib.dhp.common.collection.HttpClientParams; + +public class Gtr2PublicationsCollectorPlugin implements CollectorPlugin { + + private final HttpClientParams clientParams; + + public Gtr2PublicationsCollectorPlugin(final HttpClientParams clientParams) { + this.clientParams = clientParams; + } + + @Override + public Stream collect(final ApiDescriptor api, final AggregatorReport report) throws CollectorException { + + final String baseUrl = api.getBaseUrl(); + final String startPage = api.getParams().get("startPage"); + final String endPage = api.getParams().get("endPage"); + final String fromDate = api.getParams().get("fromDate"); + + if ((fromDate != null) && !fromDate.matches("\\d{4}-\\d{2}-\\d{2}")) { throw new CollectorException("Invalid date (YYYY-MM-DD): " + fromDate); } + + final Iterator iterator = new Gtr2PublicationsIterator(baseUrl, fromDate, startPage, endPage, this.clientParams); + final Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED); + + return StreamSupport.stream(spliterator, false); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/gtr2/Gtr2PublicationsIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/gtr2/Gtr2PublicationsIterator.java new file mode 100644 index 0000000000..9b122bbe69 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/gtr2/Gtr2PublicationsIterator.java @@ -0,0 +1,208 @@ +package eu.dnetlib.dhp.collection.plugin.gtr2; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.function.Function; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.DocumentHelper; +import org.dom4j.Element; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.common.collection.CollectorException; +import eu.dnetlib.dhp.common.collection.HttpClientParams; +import eu.dnetlib.dhp.common.collection.HttpConnector2; + +public class Gtr2PublicationsIterator implements Iterator { + + public static final int PAGE_SIZE = 20; + + private static final Logger log = LoggerFactory.getLogger(Gtr2PublicationsIterator.class); + + private final HttpConnector2 connector; + private static final DateTimeFormatter simpleDateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd"); + + private static final int MAX_ATTEMPTS = 10; + + private final String baseUrl; + private int currPage; + private int endPage; + private boolean incremental = false; + private DateTime fromDate; + + private final Map cache = new HashMap<>(); + + private final Queue queue = new LinkedList<>(); + + private String nextElement; + + public Gtr2PublicationsIterator(final String baseUrl, final String fromDate, final String startPage, final String endPage, + final HttpClientParams clientParams) + throws CollectorException { + + this.baseUrl = baseUrl; + this.currPage = NumberUtils.toInt(startPage, 1); + this.endPage = NumberUtils.toInt(endPage, Integer.MAX_VALUE); + this.incremental = StringUtils.isNotBlank(fromDate); + this.connector = new HttpConnector2(clientParams); + + if (this.incremental) { + this.fromDate = parseDate(fromDate); + } + + prepareNextElement(); + } + + @Override + public boolean hasNext() { + return this.nextElement != null; + } + + @Override + public String next() { + try { + return this.nextElement; + } finally { + prepareNextElement(); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + private void prepareNextElement() { + while ((this.currPage <= this.endPage) && this.queue.isEmpty()) { + log.debug("FETCHING PAGE + " + this.currPage + "/" + this.endPage); + this.queue.addAll(fetchPage(this.currPage++)); + } + this.nextElement = this.queue.poll(); + } + + private List fetchPage(final int pageNumber) { + + final List res = new ArrayList<>(); + try { + final Document doc = loadURL(cleanURL(this.baseUrl + "/outcomes/publications?p=" + pageNumber), 0); + + if (this.endPage == Integer.MAX_VALUE) { + this.endPage = NumberUtils.toInt(doc.valueOf("/*/@*[local-name() = 'totalPages']")); + } + + for (final Object po : doc.selectNodes("//*[local-name() = 'publication']")) { + final Element mainEntity = (Element) ((Element) po).detach(); + + if (filterIncremental(mainEntity)) { + res.add(expandMainEntity(mainEntity)); + } else { + log.debug("Skipped entity"); + } + + } + } catch (final Throwable e) { + log.error("Exception fetching page " + pageNumber, e); + throw new RuntimeException("Exception fetching page " + pageNumber, e); + } + + return res; + } + + private void addLinkedEntities(final Element master, final String relType, final Element newRoot, final Function mapper) { + + for (final Object o : master.selectNodes(".//*[local-name()='link']")) { + final String rel = ((Element) o).valueOf("@*[local-name()='rel']"); + final String href = ((Element) o).valueOf("@*[local-name()='href']"); + + if (relType.equals(rel) && StringUtils.isNotBlank(href)) { + final String cacheKey = relType + "#" + href; + if (this.cache.containsKey(cacheKey)) { + try { + log.debug(" * from cache (" + relType + "): " + href); + newRoot.add(DocumentHelper.parseText(this.cache.get(cacheKey)).getRootElement()); + } catch (final DocumentException e) { + log.error("Error retrieving cache element: " + cacheKey, e); + throw new RuntimeException("Error retrieving cache element: " + cacheKey, e); + } + } else { + final Document doc = loadURL(cleanURL(href), 0); + final Element elem = mapper.apply(doc); + newRoot.add(elem); + this.cache.put(cacheKey, elem.asXML()); + } + + } + } + } + + private boolean filterIncremental(final Element e) { + if (!this.incremental || isAfter(e.valueOf("@*[local-name() = 'created']"), this.fromDate) + || isAfter(e.valueOf("@*[local-name() = 'updated']"), this.fromDate)) { + return true; + } + return false; + } + + private String expandMainEntity(final Element mainEntity) { + final Element newRoot = DocumentHelper.createElement("doc"); + newRoot.add(mainEntity); + addLinkedEntities(mainEntity, "PROJECT", newRoot, this::asProjectElement); + return DocumentHelper.createDocument(newRoot).asXML(); + } + + private Element asProjectElement(final Document doc) { + final Element newOrg = DocumentHelper.createElement("project"); + newOrg.addElement("id").setText(doc.valueOf("/*/@*[local-name()='id']")); + newOrg.addElement("code").setText(doc.valueOf("//*[local-name()='identifier' and @*[local-name()='type'] = 'RCUK']")); + newOrg.addElement("title").setText(doc.valueOf("//*[local-name()='title']")); + return newOrg; + } + + private static String cleanURL(final String url) { + String cleaned = url; + if (cleaned.contains("gtr.gtr")) { + cleaned = cleaned.replace("gtr.gtr", "gtr"); + } + if (cleaned.startsWith("http://")) { + cleaned = cleaned.replaceFirst("http://", "https://"); + } + return cleaned; + } + + private Document loadURL(final String cleanUrl, final int attempt) { + try { + log.debug(" * Downloading Url: " + cleanUrl); + final byte[] bytes = this.connector.getInputSource(cleanUrl).getBytes("UTF-8"); + return DocumentHelper.parseText(new String(bytes)); + } catch (final Throwable e) { + log.error("Error dowloading url: " + cleanUrl + ", attempt = " + attempt, e); + if (attempt >= MAX_ATTEMPTS) { throw new RuntimeException("Error dowloading url: " + cleanUrl, e); } + try { + Thread.sleep(60000); // I wait for a minute + } catch (final InterruptedException e1) { + throw new RuntimeException("Error dowloading url: " + cleanUrl, e); + } + return loadURL(cleanUrl, attempt + 1); + } + } + + private DateTime parseDate(final String s) { + return DateTime.parse(s.contains("T") ? s.substring(0, s.indexOf("T")) : s, simpleDateTimeFormatter); + } + + private boolean isAfter(final String d, final DateTime fromDate) { + return StringUtils.isNotBlank(d) && parseDate(d).isAfter(fromDate); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/gtr2/Gtr2PublicationsIteratorTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/gtr2/Gtr2PublicationsIteratorTest.java new file mode 100644 index 0000000000..f02feb0eff --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/gtr2/Gtr2PublicationsIteratorTest.java @@ -0,0 +1,101 @@ +package eu.dnetlib.dhp.collection.plugin.gtr2; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.Iterator; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import eu.dnetlib.dhp.common.collection.HttpClientParams; + +class Gtr2PublicationsIteratorTest { + + private static final String baseURL = "https://gtr.ukri.org/gtr/api"; + + private static final HttpClientParams clientParams = new HttpClientParams(); + + @Test + @Disabled + public void testOne() throws Exception { + System.out.println("one publication"); + + final Iterator iterator = new Gtr2PublicationsIterator(baseURL, null, null, null, clientParams); + + if (iterator.hasNext()) { + final String res = iterator.next(); + assertNotNull(res); + System.out.println(res); + } + } + + @Test + @Disabled + public void testPaging() throws Exception { + final Iterator iterator = new Gtr2PublicationsIterator(baseURL, null, "2", "2", clientParams); + + while (iterator.hasNext()) { + Thread.sleep(300); + final String res = iterator.next(); + assertNotNull(res); + System.out.println(res); + } + } + + @Test + @Disabled + public void testOnePage() throws Exception { + final Iterator iterator = new Gtr2PublicationsIterator(baseURL, null, "12", "12", clientParams); + final int count = iterateAndCount(iterator); + assertEquals(20, count); + } + + @Test + @Disabled + public void testIncrementalHarvestingNoRecords() throws Exception { + System.out.println("incremental Harvesting"); + final Iterator iterator = new Gtr2PublicationsIterator(baseURL, "2050-12-12T", "11", "13", clientParams); + final int count = iterateAndCount(iterator); + assertEquals(0, count); + } + + @Test + @Disabled + public void testIncrementalHarvesting() throws Exception { + System.out.println("incremental Harvesting"); + final Iterator iterator = new Gtr2PublicationsIterator(baseURL, "2016-11-30", "11", "11", clientParams); + final int count = iterateAndCount(iterator); + assertEquals(20, count); + } + + @Test + @Disabled + public void testCompleteHarvesting() throws Exception { + System.out.println("testing complete harvesting"); + final Iterator iterator = new Gtr2PublicationsIterator(baseURL, null, null, null, clientParams); + // TryIndentXmlString indenter = new TryIndentXmlString(); + // it.setEndAtPage(3); + + while (iterator.hasNext()) { + final String res = iterator.next(); + assertNotNull(res); + // System.out.println(res); + // Scanner keyboard = new Scanner(System.in); + // System.out.println("press enter for next record"); + // keyboard.nextLine(); + + } + } + + private int iterateAndCount(final Iterator iterator) throws Exception { + int i = 0; + while (iterator.hasNext()) { + assertNotNull(iterator.next()); + i++; + } + System.out.println("Got " + i + " publications"); + return i; + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java index 44004faf31..3c8f5cef50 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java @@ -20,7 +20,6 @@ import javax.xml.transform.*; import javax.xml.transform.dom.DOMSource; import javax.xml.transform.stream.StreamResult; -import eu.dnetlib.dhp.oa.provision.model.*; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -42,6 +41,7 @@ import com.google.common.collect.Sets; import com.mycila.xmltool.XMLDoc; import com.mycila.xmltool.XMLTag; +import eu.dnetlib.dhp.oa.provision.model.*; import eu.dnetlib.dhp.schema.common.*; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.Result;