minor fixes

This commit is contained in:
Michele Artini 2024-02-14 11:39:37 +01:00
parent 963a2500be
commit ddd6a7ceb3
6 changed files with 75 additions and 50 deletions

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.collection.plugin.base;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
@ -9,7 +10,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
@ -47,17 +48,18 @@ public class BaseAnalyzerJob {
public static void main(final String[] args) throws Exception {
final String jsonConfiguration = IOUtils
.toString(GenerateRorActionSetJob.class
.getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json"));
.toString(
GenerateRorActionSetJob.class
.getResourceAsStream("/eu/dnetlib/dhp/collection/plugin/base/action_set_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
@ -73,41 +75,47 @@ public class BaseAnalyzerJob {
}
private static void processBaseRecords(final SparkSession spark,
final String inputPath,
final String outputPath) throws IOException {
final String inputPath,
final String outputPath) throws IOException {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
try (final FileSystem fs = FileSystem.get(new Configuration()); final AggregatorReport report = new AggregatorReport()) {
final AtomicInteger recordsCounter = new AtomicInteger(0);
final Map<String, AtomicInteger> fields = new HashMap<>();
final Map<String, AtomicInteger> types = new HashMap<>();
final Map<String, AtomicInteger> collections = new HashMap<>();
try (final FileSystem fs = FileSystem.get(new Configuration());
final AggregatorReport report = new AggregatorReport()) {
final Map<String, AtomicLong> fields = new HashMap<>();
final Map<String, AtomicLong> types = new HashMap<>();
final Map<String, AtomicLong> collections = new HashMap<>();
final Map<String, AtomicLong> totals = new HashMap<>();
analyze(fs, inputPath, fields, types, collections, totals, report);
analyze(fs, inputPath, recordsCounter, fields, types, collections, report);
saveReport(fs, outputPath + "/total", Map.of("#records", recordsCounter));
saveReport(fs, outputPath + "/fields", fields);
saveReport(fs, outputPath + "/types", types);
saveReport(fs, outputPath + "/collections", collections);
saveReport(fs, outputPath + "/totals", totals);
} catch (final Throwable e) {
throw new RuntimeException(e);
}
}
private static void analyze(final FileSystem fs,
final String inputPath,
final AtomicInteger recordsCounter,
final Map<String, AtomicInteger> fields,
final Map<String, AtomicInteger> types,
final Map<String, AtomicInteger> collections,
final AggregatorReport report) throws JsonProcessingException, IOException {
final String inputPath,
final Map<String, AtomicLong> fields,
final Map<String, AtomicLong> types,
final Map<String, AtomicLong> collections,
final Map<String, AtomicLong> totals,
final AggregatorReport report) throws JsonProcessingException, IOException {
final AtomicLong recordsCounter = new AtomicLong(0);
totals.put("Records", recordsCounter);
final BaseCollectorIterator iteraror = new BaseCollectorIterator(fs, new Path(inputPath), report);
while (iteraror.hasNext()) {
final Document record = iteraror.next();
final int i = recordsCounter.incrementAndGet();
final long i = recordsCounter.incrementAndGet();
if ((i % 10000) == 0) {
log.info("# Read records: " + i);
}
@ -142,27 +150,30 @@ public class BaseAnalyzerJob {
}
}
private static void incrementMapCounter(final Map<String, AtomicInteger> map, final String key) {
private static void incrementMapCounter(final Map<String, AtomicLong> map, final String key) {
if (StringUtils.isNotBlank(key)) {
if (map.containsKey(key)) {
map.get(key).incrementAndGet();
} else {
map.put(key, new AtomicInteger(1));
map.put(key, new AtomicLong(1));
}
}
}
private static void saveReport(final FileSystem fs, final String outputPath, final Map<String, AtomicInteger> fields)
throws JsonProcessingException, IOException {
private static void saveReport(final FileSystem fs, final String outputPath, final Map<String, AtomicLong> fields)
throws JsonProcessingException, IOException {
try (final SequenceFile.Writer writer = SequenceFile
.createWriter(fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
.keyClass(IntWritable.class), SequenceFile.Writer
.valueClass(Text.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
.createWriter(
fs.getConf(), SequenceFile.Writer.file(new Path(outputPath)), SequenceFile.Writer
.keyClass(IntWritable.class),
SequenceFile.Writer
.valueClass(Text.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DeflateCodec()))) {
final Text key = new Text();
final Text value = new Text();
for (final Entry<String, AtomicInteger> e : fields.entrySet()) {
for (final Entry<String, AtomicLong> e : fields.entrySet()) {
key.set(e.getKey());
value.set(e.getKey() + ": " + e.getValue());
try {

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.collection.plugin.base;
import java.io.BufferedInputStream;
@ -72,7 +73,7 @@ public class BaseCollectorIterator implements Iterator<Document> {
log.info("I start to read the TAR stream");
try (InputStream origInputStream = fs.open(filePath);
final TarArchiveInputStream tarInputStream = new TarArchiveInputStream(origInputStream)) {
final TarArchiveInputStream tarInputStream = new TarArchiveInputStream(origInputStream)) {
importTarStream(tarInputStream, report);
} catch (final Throwable e) {
throw new RuntimeException("Error processing BASE records", e);
@ -81,7 +82,7 @@ public class BaseCollectorIterator implements Iterator<Document> {
private void importTestFile(final String resourcePath, final AggregatorReport report) {
try (final InputStream origInputStream = BaseCollectorIterator.class.getResourceAsStream(resourcePath);
final TarArchiveInputStream tarInputStream = new TarArchiveInputStream(origInputStream)) {
final TarArchiveInputStream tarInputStream = new TarArchiveInputStream(origInputStream)) {
importTarStream(tarInputStream, report);
} catch (final Throwable e) {
throw new RuntimeException("Error processing BASE records", e);
@ -104,14 +105,16 @@ public class BaseCollectorIterator implements Iterator<Document> {
IOUtils.readFully(tarInputStream, bzipData);
try (InputStream bzipIs = new ByteArrayInputStream(bzipData);
final BufferedInputStream bzipBis = new BufferedInputStream(bzipIs);
final CompressorInputStream bzipInput = new CompressorStreamFactory().createCompressorInputStream(bzipBis)) {
final BufferedInputStream bzipBis = new BufferedInputStream(bzipIs);
final CompressorInputStream bzipInput = new CompressorStreamFactory()
.createCompressorInputStream(bzipBis)) {
final String xml = IOUtils.toString(new InputStreamReader(bzipInput));
final Document doc = DocumentHelper.parseText(xml);
for (final Object o : doc.selectNodes("//*[local-name()='ListRecords']/*[local-name()='record']")) {
for (final Object o : doc
.selectNodes("//*[local-name()='ListRecords']/*[local-name()='record']")) {
if (o instanceof Element) {
final Element newRoot = (Element) ((Element) o).detach();
final Document newDoc = DocumentHelper.createDocument(newRoot);

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.collection.plugin.base;
import java.io.IOException;
@ -29,7 +30,8 @@ public class BaseCollectorPlugin implements CollectorPlugin {
private static final Logger log = LoggerFactory.getLogger(AbstractSplittedRecordPlugin.class);
// MAPPING AND FILTERING ARE DEFINED HERE: https://docs.google.com/document/d/1Aj-ZAV11b44MCrAAUCPiS2TUlXb6PnJEu1utCMAcCOU/edit
// MAPPING AND FILTERING ARE DEFINED HERE:
// https://docs.google.com/document/d/1Aj-ZAV11b44MCrAAUCPiS2TUlXb6PnJEu1utCMAcCOU/edit
public BaseCollectorPlugin(final FileSystem fs) {
this.fs = fs;
@ -39,23 +41,26 @@ public class BaseCollectorPlugin implements CollectorPlugin {
public Stream<String> collect(final ApiDescriptor api, final AggregatorReport report) throws CollectorException {
// get path to file
final Path filePath = Optional
.ofNullable(api.getBaseUrl())
.map(Path::new)
.orElseThrow(() -> new CollectorException("missing baseUrl"));
.ofNullable(api.getBaseUrl())
.map(Path::new)
.orElseThrow(() -> new CollectorException("missing baseUrl"));
log.info("baseUrl: {}", filePath);
try {
if (!this.fs.exists(filePath)) { throw new CollectorException("path does not exist: " + filePath); }
if (!this.fs.exists(filePath)) {
throw new CollectorException("path does not exist: " + filePath);
}
} catch (final Throwable e) {
throw new CollectorException(e);
}
final Iterator<Document> iterator = new BaseCollectorIterator(this.fs, filePath, report);
final Spliterator<Document> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
return StreamSupport.stream(spliterator, false)
.filter(doc -> filterXml(doc, report))
.map(doc -> xmlToString(doc, report));
return StreamSupport
.stream(spliterator, false)
.filter(doc -> filterXml(doc, report))
.map(doc -> xmlToString(doc, report));
}
private boolean filterXml(final Document doc, final AggregatorReport report) {

View File

@ -2,7 +2,7 @@ package eu.dnetlib.dhp.sx.bio
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.collection.CollectionUtils
import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH,MDSTORE_SIZE_PATH}
import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH}
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.sx.bio.BioDBToOAF.ScholixResolved
@ -11,6 +11,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import eu.dnetlib.dhp.utils.DHPUtils.{MAPPER, writeHdfsFile}
object SparkTransformBioDatabaseToOAF {
def main(args: Array[String]): Unit = {

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.collection.plugin.base;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -14,15 +15,14 @@ import org.dom4j.Attribute;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.Node;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
@ExtendWith(MockitoExtension.class)
@Disabled
public class BaseCollectorIteratorTest {
@Test

View File

@ -23,10 +23,15 @@ class CrossrefMappingTest {
val mapper = new ObjectMapper()
@Test
def testMissingAuthorParser():Unit = {
val json: String = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/s41567-022-01757-y.json")).mkString
def testMissingAuthorParser(): Unit = {
val json: String = Source
.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/s41567-022-01757-y.json"))
.mkString
val result = Crossref2Oaf.convert(json)
result.filter(o => o.isInstanceOf[Publication]).map(p=> p.asInstanceOf[Publication]).foreach(p =>assertTrue(p.getAuthor.size()>0))
result
.filter(o => o.isInstanceOf[Publication])
.map(p => p.asInstanceOf[Publication])
.foreach(p => assertTrue(p.getAuthor.size() > 0))
}
@Test