implementation of gtr2Publications plugin

This commit is contained in:
Michele Artini 2024-09-16 14:16:56 +02:00
parent 8e7ef79ce0
commit bb9cee4f40
5 changed files with 283 additions and 30 deletions

View File

@ -22,6 +22,7 @@ import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.base.BaseCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.base.BaseCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.file.FileCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.file.FileCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.file.FileGZipCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.file.FileGZipCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.gtr2.Gtr2PublicationsCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.mongodb.MDStoreCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.mongodb.MDStoreCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin; import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin;
@ -58,7 +59,7 @@ public class CollectorWorker extends ReportingJob {
public void collect() throws UnknownCollectorPluginException, CollectorException, IOException { public void collect() throws UnknownCollectorPluginException, CollectorException, IOException {
final String outputPath = mdStoreVersion.getHdfsPath() + SEQUENCE_FILE_NAME; final String outputPath = this.mdStoreVersion.getHdfsPath() + SEQUENCE_FILE_NAME;
log.info("outputPath path is {}", outputPath); log.info("outputPath path is {}", outputPath);
final CollectorPlugin plugin = getCollectorPlugin(); final CollectorPlugin plugin = getCollectorPlugin();
@ -68,36 +69,36 @@ public class CollectorWorker extends ReportingJob {
try (SequenceFile.Writer writer = SequenceFile try (SequenceFile.Writer writer = SequenceFile
.createWriter( .createWriter(
fileSystem.getConf(), this.fileSystem.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
SequenceFile.Writer.file(new Path(outputPath)), .keyClass(IntWritable.class),
SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer
SequenceFile.Writer.valueClass(Text.class), .valueClass(Text.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) { SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
final IntWritable key = new IntWritable(counter.get()); final IntWritable key = new IntWritable(counter.get());
final Text value = new Text(); final Text value = new Text();
plugin plugin
.collect(api, report) .collect(this.api, this.report)
.forEach( .forEach(content -> {
content -> {
key.set(counter.getAndIncrement()); key.set(counter.getAndIncrement());
value.set(content); value.set(content);
try { try {
writer.append(key, value); writer.append(key, value);
} catch (Throwable e) { } catch (final Throwable e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
} catch (Throwable e) { } catch (final Throwable e) {
report.put(e.getClass().getName(), e.getMessage()); this.report.put(e.getClass().getName(), e.getMessage());
throw new CollectorException(e); throw new CollectorException(e);
} finally { } finally {
shutdown(); shutdown();
report.ongoing(counter.longValue(), counter.longValue()); this.report.ongoing(counter.longValue(), counter.longValue());
} }
} }
private void scheduleReport(AtomicInteger counter) { private void scheduleReport(final AtomicInteger counter) {
schedule(new ReporterCallback() { schedule(new ReporterCallback() {
@Override @Override
public Long getCurrent() { public Long getCurrent() {
return counter.longValue(); return counter.longValue();
@ -112,33 +113,35 @@ public class CollectorWorker extends ReportingJob {
private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException { private CollectorPlugin getCollectorPlugin() throws UnknownCollectorPluginException {
switch (CollectorPlugin.NAME.valueOf(api.getProtocol())) { switch (CollectorPlugin.NAME.valueOf(this.api.getProtocol())) {
case oai: case oai:
return new OaiCollectorPlugin(clientParams); return new OaiCollectorPlugin(this.clientParams);
case rest_json2xml: case rest_json2xml:
return new RestCollectorPlugin(clientParams); return new RestCollectorPlugin(this.clientParams);
case file: case file:
return new FileCollectorPlugin(fileSystem); return new FileCollectorPlugin(this.fileSystem);
case fileGzip: case fileGzip:
return new FileGZipCollectorPlugin(fileSystem); return new FileGZipCollectorPlugin(this.fileSystem);
case baseDump: case baseDump:
return new BaseCollectorPlugin(this.fileSystem); return new BaseCollectorPlugin(this.fileSystem);
case gtr2Publications:
return new Gtr2PublicationsCollectorPlugin(this.clientParams);
case other: case other:
final CollectorPlugin.NAME.OTHER_NAME plugin = Optional final CollectorPlugin.NAME.OTHER_NAME plugin = Optional
.ofNullable(api.getParams().get("other_plugin_type")) .ofNullable(this.api.getParams().get("other_plugin_type"))
.map(CollectorPlugin.NAME.OTHER_NAME::valueOf) .map(CollectorPlugin.NAME.OTHER_NAME::valueOf)
.orElseThrow(() -> new IllegalArgumentException("invalid other_plugin_type")); .orElseThrow(() -> new IllegalArgumentException("invalid other_plugin_type"));
switch (plugin) { switch (plugin) {
case mdstore_mongodb_dump: case mdstore_mongodb_dump:
return new MongoDbDumpCollectorPlugin(fileSystem); return new MongoDbDumpCollectorPlugin(this.fileSystem);
case mdstore_mongodb: case mdstore_mongodb:
return new MDStoreCollectorPlugin(); return new MDStoreCollectorPlugin();
default: default:
throw new UnknownCollectorPluginException("plugin is not managed: " + plugin); throw new UnknownCollectorPluginException("plugin is not managed: " + plugin);
} }
default: default:
throw new UnknownCollectorPluginException("protocol is not managed: " + api.getProtocol()); throw new UnknownCollectorPluginException("protocol is not managed: " + this.api.getProtocol());
} }
} }

View File

@ -11,7 +11,7 @@ public interface CollectorPlugin {
enum NAME { enum NAME {
oai, other, rest_json2xml, file, fileGzip, baseDump; oai, other, rest_json2xml, file, fileGzip, baseDump, gtr2Publications;
public enum OTHER_NAME { public enum OTHER_NAME {
mdstore_mongodb_dump, mdstore_mongodb mdstore_mongodb_dump, mdstore_mongodb

View File

@ -0,0 +1,40 @@
package eu.dnetlib.dhp.collection.plugin.gtr2;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
public class Gtr2PublicationsCollectorPlugin implements CollectorPlugin {
private final HttpClientParams clientParams;
public Gtr2PublicationsCollectorPlugin(final HttpClientParams clientParams) {
this.clientParams = clientParams;
}
@Override
public Stream<String> collect(final ApiDescriptor api, final AggregatorReport report) throws CollectorException {
final String baseUrl = api.getBaseUrl();
final String startPage = api.getParams().get("startPage");
final String endPage = api.getParams().get("endPage");
final String fromDate = api.getParams().get("fromDate");
if ((fromDate != null) && !fromDate.matches("\\d{4}-\\d{2}-\\d{2}")) { throw new CollectorException("Invalid date (YYYY-MM-DD): " + fromDate); }
final Iterator<String> iterator = new Gtr2PublicationsIterator(baseUrl, fromDate, startPage, endPage, this.clientParams);
final Spliterator<String> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
return StreamSupport.stream(spliterator, false);
}
}

View File

@ -0,0 +1,210 @@
package eu.dnetlib.dhp.collection.plugin.gtr2;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
import eu.dnetlib.dhp.common.collection.HttpConnector2;
public class Gtr2PublicationsIterator implements Iterator<String> {
public static final int PAGE_SIZE = 20;
private static final Logger log = LoggerFactory.getLogger(Gtr2PublicationsIterator.class);
private final HttpConnector2 connector;
private static final DateTimeFormatter simpleDateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd");
private static final int MAX_ATTEMPTS = 10;
private final String baseUrl;
private int currPage;
private int endPage;
private boolean incremental = false;
private DateTime fromDate;
private final Map<String, String> cache = new HashMap<>();
private final Queue<String> queue = new LinkedList<>();
private String nextElement;
public Gtr2PublicationsIterator(final String baseUrl, final String fromDate, final String startPage, final String endPage,
final HttpClientParams clientParams)
throws CollectorException {
this.baseUrl = baseUrl;
this.currPage = NumberUtils.toInt(startPage, 1);
this.endPage = NumberUtils.toInt(endPage, Integer.MAX_VALUE);
this.incremental = StringUtils.isNotBlank(fromDate);
this.connector = new HttpConnector2(clientParams);
if (this.incremental) {
this.fromDate = parseDate(fromDate);
}
prepareNextElement();
}
@Override
public boolean hasNext() {
return this.nextElement != null;
}
@Override
public String next() {
try {
return this.nextElement;
} finally {
prepareNextElement();
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
private void prepareNextElement() {
while ((this.currPage <= this.endPage) && this.queue.isEmpty()) {
log.debug("FETCHING PAGE + " + this.currPage + "/" + this.endPage);
this.queue.addAll(fetchPage(this.currPage++));
}
this.nextElement = this.queue.poll();
}
private List<String> fetchPage(final int pageNumber) {
final List<String> res = new ArrayList<>();
try {
final Document doc = loadURL(cleanURL(this.baseUrl + "/outcomes/publications?p=" + pageNumber), 0);
if (this.endPage == Integer.MAX_VALUE) {
this.endPage = NumberUtils.toInt(doc.valueOf("/*/@*[local-name() = 'totalPages']"));
}
for (final Object po : doc.selectNodes("//*[local-name() = 'publication']")) {
final Element mainEntity = (Element) ((Element) po).detach();
if (filterIncremental(mainEntity)) {
res.add(expandMainEntity(mainEntity));
} else {
log.debug("Skipped entity");
}
}
} catch (final Throwable e) {
log.error("Exception fetching page " + pageNumber, e);
throw new RuntimeException("Exception fetching page " + pageNumber, e);
}
return res;
}
private void addLinkedEntities(final Element master, final String relType, final Element newRoot, final Function<Document, Element> mapper) {
for (final Object o : master.selectNodes(".//*[local-name()='link']")) {
final String rel = ((Element) o).valueOf("@*[local-name()='rel']");
final String href = ((Element) o).valueOf("@*[local-name()='href']");
if (relType.equals(rel) && StringUtils.isNotBlank(href)) {
final String cacheKey = relType + "#" + href;
if (this.cache.containsKey(cacheKey)) {
try {
log.debug(" * from cache (" + relType + "): " + href);
newRoot.add(DocumentHelper.parseText(this.cache.get(cacheKey)).getRootElement());
} catch (final DocumentException e) {
log.error("Error retrieving cache element: " + cacheKey, e);
throw new RuntimeException("Error retrieving cache element: " + cacheKey, e);
}
} else {
final Document doc = loadURL(cleanURL(href), 0);
final Element elem = mapper.apply(doc);
newRoot.add(elem);
this.cache.put(cacheKey, elem.asXML());
}
}
}
}
private boolean filterIncremental(final Element e) {
if (!this.incremental || isAfter(e.valueOf("@*[local-name() = 'created']"), this.fromDate)
|| isAfter(e.valueOf("@*[local-name() = 'updated']"), this.fromDate)) {
return true;
}
return false;
}
private String expandMainEntity(final Element mainEntity) {
final Element newRoot = DocumentHelper.createElement("doc");
newRoot.add(mainEntity);
addLinkedEntities(mainEntity, "PROJECT", newRoot, this::asProjectElement);
return DocumentHelper.createDocument(newRoot).asXML();
}
private Element asProjectElement(final Document doc) {
final Element newOrg = DocumentHelper.createElement("project");
newOrg.addElement("id").setText(doc.valueOf("/*/@*[local-name()='id']"));
newOrg.addElement("code").setText(doc.valueOf("//*[local-name()='identifier' and @*[local-name()='type'] = 'RCUK']"));
newOrg.addElement("title").setText(doc.valueOf("//*[local-name()='title']"));
return newOrg;
}
private static String cleanURL(final String url) {
String cleaned = url;
if (cleaned.contains("gtr.gtr")) {
cleaned = cleaned.replace("gtr.gtr", "gtr");
}
if (cleaned.startsWith("http://")) {
cleaned = cleaned.replaceFirst("http://", "https://");
}
return cleaned;
}
private Document loadURL(final String cleanUrl, final int attempt) {
try {
log.debug(" * Downloading Url: " + cleanUrl);
final byte[] bytes = this.connector.getInputSource(cleanUrl).getBytes("UTF-8");
return DocumentHelper.parseText(new String(bytes));
} catch (final Throwable e) {
log.error("Error dowloading url: " + cleanUrl + ", attempt = " + attempt, e);
if (attempt >= MAX_ATTEMPTS) { throw new RuntimeException("Error dowloading url: " + cleanUrl, e); }
try {
Thread.sleep(60000); // I wait for a minute
} catch (final InterruptedException e1) {
throw new RuntimeException("Error dowloading url: " + cleanUrl, e);
}
return loadURL(cleanUrl, attempt + 1);
}
}
private DateTime parseDate(final String s) {
// I expect dates in the format 'yyyy-MM-dd'. See class
// eu.dnetlib.msro.workflows.nodes.collect.FindDateRangeForIncrementalHarvestingJobNode
return DateTime.parse(s.substring(0, s.indexOf("T")), simpleDateTimeFormatter);
}
private boolean isAfter(final String d, final DateTime fromDate) {
return parseDate(d).isAfter(fromDate);
}
}

View File

@ -20,7 +20,6 @@ import javax.xml.transform.*;
import javax.xml.transform.dom.DOMSource; import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult; import javax.xml.transform.stream.StreamResult;
import eu.dnetlib.dhp.oa.provision.model.*;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
@ -42,6 +41,7 @@ import com.google.common.collect.Sets;
import com.mycila.xmltool.XMLDoc; import com.mycila.xmltool.XMLDoc;
import com.mycila.xmltool.XMLTag; import com.mycila.xmltool.XMLTag;
import eu.dnetlib.dhp.oa.provision.model.*;
import eu.dnetlib.dhp.schema.common.*; import eu.dnetlib.dhp.schema.common.*;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;