Master branch updates from beta September 2023 #337

Manually merged
claudio.atzori merged 1271 commits from beta into master 2023-09-06 11:31:09 +02:00
7 changed files with 285 additions and 267 deletions
Showing only changes of commit 929b145130 - Show all commits

View File

@ -7,8 +7,6 @@ import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import eu.dnetlib.dhp.collection.plugin.file.FileCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.file.FileGZipCollectorPlugin;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
@ -21,6 +19,8 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.aggregation.common.ReporterCallback;
import eu.dnetlib.dhp.aggregation.common.ReportingJob;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.file.FileCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.file.FileGZipCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.mongodb.MDStoreCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.mongodb.MongoDbDumpCollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.collection.plugin.file;
import java.io.BufferedInputStream;
@ -9,69 +10,71 @@ import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.plugin.utils.XMLIterator;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractSplittedRecordPlugin implements CollectorPlugin {
private static final Logger log = LoggerFactory.getLogger(AbstractSplittedRecordPlugin.class);
private static final Logger log = LoggerFactory.getLogger(AbstractSplittedRecordPlugin.class);
public static final String SPLIT_ON_ELEMENT = "splitOnElement";
public static final String SPLIT_ON_ELEMENT = "splitOnElement";
private final FileSystem fileSystem;
private final FileSystem fileSystem;
public AbstractSplittedRecordPlugin(FileSystem fileSystem) {
this.fileSystem = fileSystem;
}
public AbstractSplittedRecordPlugin(FileSystem fileSystem) {
this.fileSystem = fileSystem;
}
@Override
public Stream<String> collect(ApiDescriptor api, AggregatorReport report) throws CollectorException {
@Override
public Stream<String> collect(ApiDescriptor api, AggregatorReport report) throws CollectorException {
// get path to file
final Path filePath = Optional
.ofNullable(api.getBaseUrl())
.map(Path::new)
.orElseThrow( () -> new CollectorException("missing baseUrl"));
// get path to file
final Path filePath = Optional
.ofNullable(api.getBaseUrl())
.map(Path::new)
.orElseThrow(() -> new CollectorException("missing baseUrl"));
log.info("baseUrl: {}", filePath);
log.info("baseUrl: {}", filePath);
// check that path to file exists
try {
if (!fileSystem.exists(filePath)) {
throw new CollectorException("path does not exist: " + filePath);
}
} catch (IOException e) {
throw new CollectorException(e);
}
// check that path to file exists
try {
if (!fileSystem.exists(filePath)) {
throw new CollectorException("path does not exist: " + filePath);
}
} catch (IOException e) {
throw new CollectorException(e);
}
// get split element
final String splitOnElement = Optional
.ofNullable(api.getParams().get(SPLIT_ON_ELEMENT))
.orElseThrow(() -> new CollectorException(String.format("missing parameter '%s', required by the AbstractSplittedRecordPlugin", SPLIT_ON_ELEMENT)));
// get split element
final String splitOnElement = Optional
.ofNullable(api.getParams().get(SPLIT_ON_ELEMENT))
.orElseThrow(
() -> new CollectorException(String
.format("missing parameter '%s', required by the AbstractSplittedRecordPlugin", SPLIT_ON_ELEMENT)));
log.info("splitOnElement: {}", splitOnElement);
log.info("splitOnElement: {}", splitOnElement);
final BufferedInputStream bis = getBufferedInputStream(filePath);
final BufferedInputStream bis = getBufferedInputStream(filePath);
Iterator<String> xmlIterator = new XMLIterator(splitOnElement, bis);
Iterator<String> xmlIterator = new XMLIterator(splitOnElement, bis);
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(xmlIterator, Spliterator.ORDERED),
false
);
}
return StreamSupport
.stream(
Spliterators.spliteratorUnknownSize(xmlIterator, Spliterator.ORDERED),
false);
}
abstract protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException;
abstract protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException;
public FileSystem getFileSystem() {
return fileSystem;
}
}
public FileSystem getFileSystem() {
return fileSystem;
}
}

View File

@ -1,31 +1,33 @@
package eu.dnetlib.dhp.collection.plugin.file;
import eu.dnetlib.dhp.common.collection.CollectorException;
import java.io.BufferedInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import eu.dnetlib.dhp.common.collection.CollectorException;
public class FileCollectorPlugin extends AbstractSplittedRecordPlugin {
private static final Logger log = LoggerFactory.getLogger(FileCollectorPlugin.class);
private static final Logger log = LoggerFactory.getLogger(FileCollectorPlugin.class);
public FileCollectorPlugin(FileSystem fileSystem) {
super(fileSystem);
}
public FileCollectorPlugin(FileSystem fileSystem) {
super(fileSystem);
}
@Override
protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException {
@Override
protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException {
log.info("filePath: {}", filePath);
log.info("filePath: {}", filePath);
try {
FileSystem fs = super.getFileSystem();
return new BufferedInputStream(fs.open(filePath));
} catch (Exception e) {
throw new CollectorException("Error reading file " + filePath, e);
}
}
}
try {
FileSystem fs = super.getFileSystem();
return new BufferedInputStream(fs.open(filePath));
} catch (Exception e) {
throw new CollectorException("Error reading file " + filePath, e);
}
}
}

View File

@ -1,33 +1,35 @@
package eu.dnetlib.dhp.collection.plugin.file;
import eu.dnetlib.dhp.common.collection.CollectorException;
import java.io.BufferedInputStream;
import java.util.zip.GZIPInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.util.zip.GZIPInputStream;
import eu.dnetlib.dhp.common.collection.CollectorException;
public class FileGZipCollectorPlugin extends AbstractSplittedRecordPlugin {
private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPlugin.class);
private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPlugin.class);
public FileGZipCollectorPlugin(FileSystem fileSystem) {
super(fileSystem);
}
public FileGZipCollectorPlugin(FileSystem fileSystem) {
super(fileSystem);
}
@Override
protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException {
@Override
protected BufferedInputStream getBufferedInputStream(final Path filePath) throws CollectorException {
log.info("filePath: {}", filePath);
log.info("filePath: {}", filePath);
try {
FileSystem fs = super.getFileSystem();
GZIPInputStream stream = new GZIPInputStream(fs.open(filePath));
return new BufferedInputStream(stream);
} catch (Exception e) {
throw new CollectorException("Error reading file " + filePath, e);
}
}
try {
FileSystem fs = super.getFileSystem();
GZIPInputStream stream = new GZIPInputStream(fs.open(filePath));
return new BufferedInputStream(stream);
} catch (Exception e) {
throw new CollectorException("Error reading file " + filePath, e);
}
}
}

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.collection.plugin.utils;
import java.io.InputStream;
@ -23,148 +24,154 @@ import org.apache.commons.logging.LogFactory;
public class XMLIterator implements Iterator<String> {
private static final Log log = LogFactory.getLog(XMLIterator.class);
private static final Log log = LogFactory.getLog(XMLIterator.class);
private ThreadLocal<XMLInputFactory> inputFactory = new ThreadLocal<XMLInputFactory>() {
private ThreadLocal<XMLInputFactory> inputFactory = new ThreadLocal<XMLInputFactory>() {
@Override
protected XMLInputFactory initialValue() {
return XMLInputFactory.newInstance();
}
};
@Override
protected XMLInputFactory initialValue() {
return XMLInputFactory.newInstance();
}
};
private ThreadLocal<XMLOutputFactory> outputFactory = new ThreadLocal<XMLOutputFactory>() {
private ThreadLocal<XMLOutputFactory> outputFactory = new ThreadLocal<XMLOutputFactory>() {
@Override
protected XMLOutputFactory initialValue() {
return XMLOutputFactory.newInstance();
}
};
@Override
protected XMLOutputFactory initialValue() {
return XMLOutputFactory.newInstance();
}
};
private ThreadLocal<XMLEventFactory> eventFactory = new ThreadLocal<XMLEventFactory>() {
private ThreadLocal<XMLEventFactory> eventFactory = new ThreadLocal<XMLEventFactory>() {
@Override
protected XMLEventFactory initialValue() {
return XMLEventFactory.newInstance();
}
};
@Override
protected XMLEventFactory initialValue() {
return XMLEventFactory.newInstance();
}
};
public static final String UTF_8 = "UTF-8";
public static final String UTF_8 = "UTF-8";
final XMLEventReader parser;
final XMLEventReader parser;
private XMLEvent current = null;
private XMLEvent current = null;
private String element;
private String element;
private InputStream inputStream;
private InputStream inputStream;
public XMLIterator(final String element, final InputStream inputStream) {
super();
this.element = element;
this.inputStream = inputStream;
this.parser = getParser();
try {
this.current = findElement(parser);
} catch (XMLStreamException e) {
log.warn("cannot init parser position. No element found: " + element);
current = null;
}
}
public XMLIterator(final String element, final InputStream inputStream) {
super();
this.element = element;
this.inputStream = inputStream;
this.parser = getParser();
try {
this.current = findElement(parser);
} catch (XMLStreamException e) {
log.warn("cannot init parser position. No element found: " + element);
current = null;
}
}
@Override
public boolean hasNext() {
return current != null;
}
@Override
public boolean hasNext() {
return current != null;
}
@Override
public String next() {
String result = null;
try {
result = copy(parser);
current = findElement(parser);
return result;
} catch (XMLStreamException e) {
throw new RuntimeException(String.format("error copying xml, built so far: '%s'", result), e);
}
}
@Override
public String next() {
String result = null;
try {
result = copy(parser);
current = findElement(parser);
return result;
} catch (XMLStreamException e) {
throw new RuntimeException(String.format("error copying xml, built so far: '%s'", result), e);
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@SuppressWarnings("finally")
private String copy(final XMLEventReader parser) throws XMLStreamException {
final StringWriter result = new StringWriter();
try {
final XMLEventWriter writer = outputFactory.get().createXMLEventWriter(result);
final StartElement start = current.asStartElement();
final StartElement newRecord = eventFactory.get().createStartElement(start.getName(), start.getAttributes(), start.getNamespaces());
@SuppressWarnings("finally")
private String copy(final XMLEventReader parser) throws XMLStreamException {
final StringWriter result = new StringWriter();
try {
final XMLEventWriter writer = outputFactory.get().createXMLEventWriter(result);
final StartElement start = current.asStartElement();
final StartElement newRecord = eventFactory
.get()
.createStartElement(start.getName(), start.getAttributes(), start.getNamespaces());
// new root record
writer.add(newRecord);
// new root record
writer.add(newRecord);
// copy the rest as it is
while (parser.hasNext()) {
final XMLEvent event = parser.nextEvent();
// copy the rest as it is
while (parser.hasNext()) {
final XMLEvent event = parser.nextEvent();
// TODO: replace with depth tracking instead of close tag tracking.
if (event.isEndElement() && event.asEndElement().getName().getLocalPart().equals(element)) {
writer.add(event);
break;
}
// TODO: replace with depth tracking instead of close tag tracking.
if (event.isEndElement() && event.asEndElement().getName().getLocalPart().equals(element)) {
writer.add(event);
break;
}
writer.add(event);
}
writer.close();
} finally {
return result.toString();
}
}
writer.add(event);
}
writer.close();
} finally {
return result.toString();
}
}
/**
* Looks for the next occurrence of the splitter element.
*
* @param parser
* @return
* @throws XMLStreamException
*/
private XMLEvent findElement(final XMLEventReader parser) throws XMLStreamException {
/**
* Looks for the next occurrence of the splitter element.
*
* @param parser
* @return
* @throws XMLStreamException
*/
private XMLEvent findElement(final XMLEventReader parser) throws XMLStreamException {
/*
* if (current != null && element.equals(current.asStartElement().getName().getLocalPart())) { return current; }
*/
/*
* if (current != null && element.equals(current.asStartElement().getName().getLocalPart())) { return current; }
*/
XMLEvent peek = parser.peek();
if (peek != null && peek.isStartElement()) {
String name = peek.asStartElement().getName().getLocalPart();
if (element.equals(name)) { return peek; }
}
XMLEvent peek = parser.peek();
if (peek != null && peek.isStartElement()) {
String name = peek.asStartElement().getName().getLocalPart();
if (element.equals(name)) {
return peek;
}
}
while (parser.hasNext()) {
final XMLEvent event = parser.nextEvent();
if (event != null && event.isStartElement()) {
String name = event.asStartElement().getName().getLocalPart();
if (element.equals(name)) { return event; }
}
}
return null;
}
while (parser.hasNext()) {
final XMLEvent event = parser.nextEvent();
if (event != null && event.isStartElement()) {
String name = event.asStartElement().getName().getLocalPart();
if (element.equals(name)) {
return event;
}
}
}
return null;
}
private XMLEventReader getParser() {
try {
return inputFactory.get().createXMLEventReader(sanitize(inputStream));
} catch (XMLStreamException e) {
throw new RuntimeException(e);
}
}
private XMLEventReader getParser() {
try {
return inputFactory.get().createXMLEventReader(sanitize(inputStream));
} catch (XMLStreamException e) {
throw new RuntimeException(e);
}
}
private Reader sanitize(final InputStream in) {
final CharsetDecoder charsetDecoder = Charset.forName(UTF_8).newDecoder();
charsetDecoder.onMalformedInput(CodingErrorAction.REPLACE);
charsetDecoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
return new InputStreamReader(in, charsetDecoder);
}
private Reader sanitize(final InputStream in) {
final CharsetDecoder charsetDecoder = Charset.forName(UTF_8).newDecoder();
charsetDecoder.onMalformedInput(CodingErrorAction.REPLACE);
charsetDecoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
return new InputStreamReader(in, charsetDecoder);
}
}
}

View File

@ -1,9 +1,10 @@
package eu.dnetlib.dhp.collection.plugin.file;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
import net.bytebuddy.asm.Advice;
import java.io.IOException;
import java.util.HashMap;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
@ -13,48 +14,48 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.stream.Stream;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
import net.bytebuddy.asm.Advice;
public class FileCollectorPluginTest {
private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class);
private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class);
private final ApiDescriptor api = new ApiDescriptor();
private final ApiDescriptor api = new ApiDescriptor();
private FileCollectorPlugin plugin;
private FileCollectorPlugin plugin;
private static final String SPLIT_ON_ELEMENT = "repository";
private static final String SPLIT_ON_ELEMENT = "repository";
@BeforeEach
public void setUp() throws IOException {
@BeforeEach
public void setUp() throws IOException {
final String gzipFile = this
.getClass()
.getResource("/eu/dnetlib/dhp/collection/plugin/file/opendoar.xml")
.getFile();
final String gzipFile = this
.getClass()
.getResource("/eu/dnetlib/dhp/collection/plugin/file/opendoar.xml")
.getFile();
api.setBaseUrl(gzipFile);
api.setBaseUrl(gzipFile);
HashMap<String, String> params = new HashMap<>();
params.put("splitOnElement", SPLIT_ON_ELEMENT);
HashMap<String, String> params = new HashMap<>();
params.put("splitOnElement", SPLIT_ON_ELEMENT);
api.setParams(params);
api.setParams(params);
FileSystem fs = FileSystem.get(new Configuration());
plugin = new FileCollectorPlugin(fs);
}
FileSystem fs = FileSystem.get(new Configuration());
plugin = new FileCollectorPlugin(fs);
}
@Test
void test() throws CollectorException {
@Test
void test() throws CollectorException {
final Stream<String> stream = plugin.collect(api, new AggregatorReport());
final Stream<String> stream = plugin.collect(api, new AggregatorReport());
stream.limit(10).forEach(s -> {
Assertions.assertTrue(s.length() > 0);
log.info(s);
});
}
stream.limit(10).forEach(s -> {
Assertions.assertTrue(s.length() > 0);
log.info(s);
});
}
}

View File

@ -1,8 +1,13 @@
package eu.dnetlib.dhp.collection.plugin.file;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
@ -13,53 +18,51 @@ import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.Objects;
import java.util.stream.Stream;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.collection.CollectorException;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@ExtendWith(MockitoExtension.class)
public class FileGZipCollectorPluginTest {
private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class);
private static final Logger log = LoggerFactory.getLogger(FileGZipCollectorPluginTest.class);
private final ApiDescriptor api = new ApiDescriptor();
private final ApiDescriptor api = new ApiDescriptor();
private FileGZipCollectorPlugin plugin;
private FileGZipCollectorPlugin plugin;
private static final String SPLIT_ON_ELEMENT = "repository";
private static final String SPLIT_ON_ELEMENT = "repository";
@BeforeEach
public void setUp() throws IOException {
@BeforeEach
public void setUp() throws IOException {
final String gzipFile = Objects.requireNonNull(this
.getClass()
.getResource("/eu/dnetlib/dhp/collection/plugin/file/opendoar.xml.gz"))
.getFile();
final String gzipFile = Objects
.requireNonNull(
this
.getClass()
.getResource("/eu/dnetlib/dhp/collection/plugin/file/opendoar.xml.gz"))
.getFile();
api.setBaseUrl(gzipFile);
api.setBaseUrl(gzipFile);
HashMap<String, String> params = new HashMap<>();
params.put("splitOnElement", SPLIT_ON_ELEMENT);
HashMap<String, String> params = new HashMap<>();
params.put("splitOnElement", SPLIT_ON_ELEMENT);
api.setParams(params);
api.setParams(params);
FileSystem fs = FileSystem.get(new Configuration());
plugin = new FileGZipCollectorPlugin(fs);
}
FileSystem fs = FileSystem.get(new Configuration());
plugin = new FileGZipCollectorPlugin(fs);
}
@Test
void test() throws CollectorException {
@Test
void test() throws CollectorException {
final Stream<String> stream = plugin.collect(api, new AggregatorReport());
final Stream<String> stream = plugin.collect(api, new AggregatorReport());
stream.limit(10).forEach(s -> {
Assertions.assertTrue(s.length() > 0);
log.info(s);
});
}
stream.limit(10).forEach(s -> {
Assertions.assertTrue(s.length() > 0);
log.info(s);
});
}
}