diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java index 250c8af1d..5df3aad69 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseAnalyzerJob.java @@ -26,6 +26,8 @@ import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; import org.dom4j.Attribute; import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.DocumentHelper; import org.dom4j.Element; import org.dom4j.Node; import org.slf4j.Logger; @@ -46,17 +48,18 @@ public class BaseAnalyzerJob { public static void main(final String[] args) throws Exception { final String jsonConfiguration = IOUtils - .toString(BaseAnalyzerJob.class - .getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json")); + .toString( + BaseAnalyzerJob.class + .getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); @@ -72,11 +75,11 @@ public class BaseAnalyzerJob { } private static void processBaseRecords(final SparkSession spark, - final String inputPath, - final String outputPath) throws IOException { + final String inputPath, + final String outputPath) throws IOException { try (final FileSystem fs = FileSystem.get(new Configuration()); - final AggregatorReport report = new AggregatorReport()) { + final AggregatorReport report = new AggregatorReport()) { final Map fields = new HashMap<>(); final Map types = new HashMap<>(); final Map collections = new HashMap<>(); @@ -94,12 +97,12 @@ public class BaseAnalyzerJob { } private static void analyze(final FileSystem fs, - final String inputPath, - final Map fields, - final Map types, - final Map collections, - final Map totals, - final AggregatorReport report) throws JsonProcessingException, IOException { + final String inputPath, + final Map fields, + final Map types, + final Map collections, + final Map totals, + final AggregatorReport report) throws JsonProcessingException, IOException, DocumentException { final AtomicLong recordsCounter = new AtomicLong(0); @@ -108,7 +111,7 @@ public class BaseAnalyzerJob { final BaseCollectorIterator iteraror = new BaseCollectorIterator(fs, new Path(inputPath), report); while (iteraror.hasNext()) { - final Document record = iteraror.next(); + final Document record = DocumentHelper.parseText(iteraror.next()); final long i = recordsCounter.incrementAndGet(); if ((i % 10000) == 0) { @@ -160,11 +163,14 @@ public class BaseAnalyzerJob { } private static void saveReport(final FileSystem fs, final String outputPath, final Map fields) - throws JsonProcessingException, IOException { + throws JsonProcessingException, IOException { try (final SequenceFile.Writer writer = SequenceFile - .createWriter(fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer - .keyClass(IntWritable.class), SequenceFile.Writer - .valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) { + .createWriter( + fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer + .keyClass(IntWritable.class), + SequenceFile.Writer + .valueClass(Text.class), + SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) { final Text key = new Text(); final Text value = new Text(); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java index 598cc437c..c2cd9d885 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIterator.java @@ -4,11 +4,19 @@ package eu.dnetlib.dhp.collection.plugin.base; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.InputStream; -import java.io.InputStreamReader; +import java.io.StringWriter; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import javax.xml.stream.XMLEventReader; +import javax.xml.stream.XMLEventWriter; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLOutputFactory; +import javax.xml.stream.events.EndElement; +import javax.xml.stream.events.StartElement; +import javax.xml.stream.events.XMLEvent; + import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.compress.compressors.CompressorInputStream; @@ -16,22 +24,21 @@ import org.apache.commons.compress.compressors.CompressorStreamFactory; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.dom4j.Document; -import org.dom4j.DocumentHelper; -import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.common.aggregation.AggregatorReport; -public class BaseCollectorIterator implements Iterator { +public class BaseCollectorIterator implements Iterator { - private Object nextElement; + private String nextElement; - private final BlockingQueue queue = new LinkedBlockingQueue<>(20); + private final BlockingQueue queue = new LinkedBlockingQueue<>(20); private static final Logger log = LoggerFactory.getLogger(BaseCollectorIterator.class); + private static final String END_ELEM = "__END__"; + public BaseCollectorIterator(final FileSystem fs, final Path filePath, final AggregatorReport report) { new Thread(() -> importHadoopFile(fs, filePath, report)).start(); try { @@ -52,13 +59,13 @@ public class BaseCollectorIterator implements Iterator { @Override public synchronized boolean hasNext() { - return (this.nextElement != null) && (this.nextElement instanceof Document); + return (this.nextElement != null) & !END_ELEM.equals(this.nextElement); } @Override - public synchronized Document next() { + public synchronized String next() { try { - return this.nextElement instanceof Document ? (Document) this.nextElement : null; + return END_ELEM.equals(this.nextElement) ? null : this.nextElement; } finally { try { this.nextElement = this.queue.take(); @@ -92,6 +99,9 @@ public class BaseCollectorIterator implements Iterator { private void importTarStream(final TarArchiveInputStream tarInputStream, final AggregatorReport report) { long count = 0; + final XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance(); + final XMLOutputFactory xmlOutputFactory = XMLOutputFactory.newInstance(); + try { TarArchiveEntry entry; while ((entry = (TarArchiveEntry) tarInputStream.getNextEntry()) != null) { @@ -109,24 +119,46 @@ public class BaseCollectorIterator implements Iterator { final CompressorInputStream bzipInput = new CompressorStreamFactory() .createCompressorInputStream(bzipBis)) { - final String xml = IOUtils.toString(new InputStreamReader(bzipInput)); + final XMLEventReader reader = xmlInputFactory.createXMLEventReader(bzipInput); - final Document doc = DocumentHelper.parseText(xml); + XMLEventWriter eventWriter = null; + StringWriter xmlWriter = null; - for (final Object o : doc - .selectNodes("//*[local-name()='ListRecords']/*[local-name()='record']")) { - if (o instanceof Element) { - final Element newRoot = (Element) ((Element) o).detach(); - final Document newDoc = DocumentHelper.createDocument(newRoot); - this.queue.put(newDoc); - count++; + while (reader.hasNext()) { + final XMLEvent nextEvent = reader.nextEvent(); + + if (nextEvent.isStartElement()) { + final StartElement startElement = nextEvent.asStartElement(); + if ("record".equals(startElement.getName().getLocalPart())) { + xmlWriter = new StringWriter(); + eventWriter = xmlOutputFactory.createXMLEventWriter(xmlWriter); + } } + + if (eventWriter != null) { + eventWriter.add(nextEvent); + } + + if (nextEvent.isEndElement()) { + final EndElement endElement = nextEvent.asEndElement(); + if ("record".equals(endElement.getName().getLocalPart())) { + eventWriter.flush(); + eventWriter.close(); + + this.queue.put(xmlWriter.toString()); + + eventWriter = null; + xmlWriter = null; + count++; + } + } + } } } } - this.queue.put("__END__"); // I ADD A NOT ELEMENT OBJECT TO INDICATE THE END OF THE QUEUE + this.queue.put(END_ELEM); // TO INDICATE THE END OF THE QUEUE } catch (final Throwable e) { log.error("Error processing BASE records", e); report.put(e.getClass().getName(), e.getMessage()); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java index 3e4309042..e203ce30c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorPlugin.java @@ -1,8 +1,6 @@ package eu.dnetlib.dhp.collection.plugin.base; -import java.io.IOException; -import java.io.StringWriter; import java.util.Iterator; import java.util.Optional; import java.util.Spliterator; @@ -12,9 +10,6 @@ import java.util.stream.StreamSupport; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.dom4j.Document; -import org.dom4j.io.OutputFormat; -import org.dom4j.io.XMLWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,15 +50,14 @@ public class BaseCollectorPlugin implements CollectorPlugin { throw new CollectorException(e); } - final Iterator iterator = new BaseCollectorIterator(this.fs, filePath, report); - final Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED); + final Iterator iterator = new BaseCollectorIterator(this.fs, filePath, report); + final Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED); return StreamSupport .stream(spliterator, false) - .filter(doc -> filterXml(doc, report)) - .map(doc -> xmlToString(doc, report)); + .filter(doc -> filterXml(doc, report)); } - private boolean filterXml(final Document doc, final AggregatorReport report) { + private boolean filterXml(final String xml, final AggregatorReport report) { // TODO Auto-generated method stub // HERE THE FILTERS ACCORDING TO THE DOCUMENTATION @@ -71,15 +65,4 @@ public class BaseCollectorPlugin implements CollectorPlugin { return true; } - private String xmlToString(final Document doc, final AggregatorReport report) { - try (final StringWriter sw = new StringWriter()) { - final XMLWriter writer = new XMLWriter(sw, OutputFormat.createPrettyPrint()); - writer.write(doc); - return writer.toString(); - } catch (final IOException e) { - report.put(e.getClass().getName(), e.getMessage()); - throw new RuntimeException("Error indenting XML record", e); - } - } - } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java index 4d7c5cdc8..368bc954e 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/plugin/base/BaseCollectorIteratorTest.java @@ -13,16 +13,15 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.StringUtils; import org.dom4j.Attribute; import org.dom4j.Document; +import org.dom4j.DocumentHelper; import org.dom4j.Element; import org.dom4j.Node; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.common.aggregation.AggregatorReport; -@Disabled public class BaseCollectorIteratorTest { @Test @@ -37,7 +36,8 @@ public class BaseCollectorIteratorTest { final Set types = new HashSet<>(); while (iterator.hasNext()) { - final Document record = iterator.next(); + + final Document record = DocumentHelper.parseText(iterator.next()); count++;