From 40764cf626e316f4fba5d999421fdac0ec25c129 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 4 Feb 2021 14:06:02 +0100 Subject: [PATCH] better logging, WIP: collectorWorker error reporting --- .../dhp/application/ApplicationUtils.java | 6 +- .../ArgumentApplicationParser.java | 20 ++--- .../collection/plugin/oai/OaiIterator.java | 12 ++- .../collection/worker/CollectorWorker.java | 6 +- .../worker/CollectorWorkerApplication.java | 5 +- .../aggregation/AbstractVocabularyTest.java | 68 ++++++++-------- .../dhp/aggregation/AggregationJobTest.java | 80 ++++++++++--------- .../transformation/TransformationJobTest.java | 45 ++++++----- 8 files changed, 125 insertions(+), 117 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/application/ApplicationUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ApplicationUtils.java index 531c13af3..72c41a062 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/application/ApplicationUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ApplicationUtils.java @@ -1,14 +1,12 @@ package eu.dnetlib.dhp.application; -import java.io.File; -import java.io.FileOutputStream; -import java.io.OutputStream; +import java.io.*; import java.util.Properties; public class ApplicationUtils { - public static void populateOOZIEEnv(final String paramName, String value) throws Exception { + public static void populateOOZIEEnv(final String paramName, String value) throws IOException { File file = new File(System.getProperty("oozie.action.output.properties")); Properties props = new Properties(); diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java index e65b4bb0b..0429bc25d 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java @@ -1,10 +1,7 @@ package eu.dnetlib.dhp.application; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.Serializable; -import java.io.StringWriter; +import java.io.*; import java.util.*; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -12,17 +9,21 @@ import java.util.zip.GZIPOutputStream; import org.apache.commons.cli.*; import org.apache.commons.codec.binary.Base64; import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; public class ArgumentApplicationParser implements Serializable { + private static final Logger log = LoggerFactory.getLogger(ArgumentApplicationParser.class); + private final Options options = new Options(); private final Map objectMap = new HashMap<>(); private final List compressedValues = new ArrayList<>(); - public ArgumentApplicationParser(final String json_configuration) throws Exception { + public ArgumentApplicationParser(final String json_configuration) throws IOException { final ObjectMapper mapper = new ObjectMapper(); final OptionsParameter[] configuration = mapper.readValue(json_configuration, OptionsParameter[].class); createOptionMap(configuration); @@ -33,7 +34,6 @@ public class ArgumentApplicationParser implements Serializable { } private void createOptionMap(final OptionsParameter[] configuration) { - Arrays .stream(configuration) .map( @@ -47,10 +47,6 @@ public class ArgumentApplicationParser implements Serializable { return o; }) .forEach(options::addOption); - - // HelpFormatter formatter = new HelpFormatter(); - // formatter.printHelp("myapp", null, options, null, true); - } public static String decompressValue(final String abstractCompressed) { @@ -61,7 +57,7 @@ public class ArgumentApplicationParser implements Serializable { IOUtils.copy(gis, stringWriter); return stringWriter.toString(); } catch (Throwable e) { - System.out.println("Wrong value to decompress:" + abstractCompressed); + log.error("Wrong value to decompress:" + abstractCompressed); throw new RuntimeException(e); } } @@ -74,7 +70,7 @@ public class ArgumentApplicationParser implements Serializable { return java.util.Base64.getEncoder().encodeToString(out.toByteArray()); } - public void parseArgument(final String[] args) throws Exception { + public void parseArgument(final String[] args) throws ParseException { CommandLineParser parser = new BasicParser(); CommandLine cmd = parser.parse(options, args); Arrays diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java index df0722905..c9cde57ce 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java @@ -113,6 +113,7 @@ public class OaiIterator implements Iterator { return downloadPage(url); } catch (final UnsupportedEncodingException e) { + errorLogList.add(e.getMessage()); throw new CollectorException(e); } } @@ -138,6 +139,7 @@ public class OaiIterator implements Iterator { + "?verb=ListRecords&resumptionToken=" + URLEncoder.encode(resumptionToken, "UTF-8")); } catch (final UnsupportedEncodingException e) { + errorLogList.add(e.getMessage()); throw new CollectorException(e); } } @@ -150,12 +152,14 @@ public class OaiIterator implements Iterator { doc = reader.read(new StringReader(xml)); } catch (final DocumentException e) { log.warn("Error parsing xml, I try to clean it. {}", e.getMessage()); + errorLogList.add(e.getMessage()); final String cleaned = XmlCleaner.cleanAllEntities(xml); try { doc = reader.read(new StringReader(cleaned)); } catch (final DocumentException e1) { final String resumptionToken = extractResumptionToken(xml); if (resumptionToken == null) { + errorLogList.add(e1.getMessage()); throw new CollectorException("Error parsing cleaned document:\n" + cleaned, e1); } return resumptionToken; @@ -166,10 +170,14 @@ public class OaiIterator implements Iterator { if (errorNode != null) { final String code = errorNode.valueOf("@code"); if ("noRecordsMatch".equalsIgnoreCase(code.trim())) { - log.warn("noRecordsMatch for oai call: " + url); + final String msg = "noRecordsMatch for oai call : " + url; + log.warn(msg); + errorLogList.add(msg); return null; } else { - throw new CollectorException(code + " - " + errorNode.getText()); + final String msg = code + " - " + errorNode.getText(); + errorLogList.add(msg); + throw new CollectorException(msg); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java index 7033cfd8e..f1d3aec9c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java @@ -29,16 +29,13 @@ public class CollectorWorker { private final String hdfsPath; - private CollectorPlugin plugin; - public CollectorWorker( final ApiDescriptor api, final String hdfsuri, - final String hdfsPath) throws CollectorException { + final String hdfsPath) { this.api = api; this.hdfsuri = hdfsuri; this.hdfsPath = hdfsPath; - this.plugin = CollectorPluginFactory.getPluginByProtocol(api.getProtocol()); } public CollectorPluginErrorLogList collect() throws IOException, CollectorException { @@ -59,6 +56,7 @@ public class CollectorWorker { log.info("Created path " + hdfswritepath.toString()); + final CollectorPlugin plugin = CollectorPluginFactory.getPluginByProtocol(api.getProtocol()); final AtomicInteger counter = new AtomicInteger(0); try (SequenceFile.Writer writer = SequenceFile .createWriter( diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java index d89bcee54..7ec830879 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java @@ -5,6 +5,9 @@ import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*; import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; import static eu.dnetlib.dhp.application.ApplicationUtils.*; +import java.io.IOException; + +import org.apache.commons.cli.ParseException; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +38,7 @@ public class CollectorWorkerApplication { /** * @param args */ - public static void main(final String[] args) throws Exception { + public static void main(final String[] args) throws ParseException, IOException, CollectorException { final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser( IOUtils diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AbstractVocabularyTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AbstractVocabularyTest.java index 84878bd1b..8e0f0ce4b 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AbstractVocabularyTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AbstractVocabularyTest.java @@ -1,52 +1,52 @@ + package eu.dnetlib.dhp.aggregation; +import static org.mockito.Mockito.lenient; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.mockito.Mock; + import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.transformation.TransformationFactory; import eu.dnetlib.dhp.transformation.TransformationJobTest; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import org.apache.commons.io.IOUtils; -import org.mockito.Mock; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; - -import static org.mockito.Mockito.lenient; public abstract class AbstractVocabularyTest { - @Mock - protected ISLookUpService isLookUpService; + @Mock + protected ISLookUpService isLookUpService; - protected VocabularyGroup vocabularies; + protected VocabularyGroup vocabularies; + public void setUpVocabulary() throws ISLookUpException, IOException { + lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs()); + lenient() + .when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY)) + .thenReturn(synonyms()); + vocabularies = VocabularyGroup.loadVocsFromIS(isLookUpService); + } - public void setUpVocabulary() throws ISLookUpException, IOException { - lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs()); + private static List vocs() throws IOException { + return IOUtils + .readLines(TransformationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/transform/terms.txt")); + } - lenient() - .when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY)) - .thenReturn(synonyms()); - vocabularies = VocabularyGroup.loadVocsFromIS(isLookUpService); - } + private static List synonyms() throws IOException { + return IOUtils + .readLines(TransformationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/transform/synonyms.txt")); + } - private static List vocs() throws IOException { - return IOUtils - .readLines(TransformationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/transform/terms.txt")); - } + protected void mockupTrasformationRule(final String trule, final String path) throws Exception { + final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path)); - private static List synonyms() throws IOException { - return IOUtils - .readLines(TransformationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/transform/synonyms.txt")); - } - - protected void mockupTrasformationRule(final String trule, final String path) throws Exception { - final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path)); - - lenient() - .when(isLookUpService.quickSearchProfile(String.format(TransformationFactory.TRULE_XQUERY, trule))) - .thenReturn(Collections.singletonList(trValue)); - } + lenient() + .when(isLookUpService.quickSearchProfile(String.format(TransformationFactory.TRULE_XQUERY, trule))) + .thenReturn(Collections.singletonList(trValue)); + } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java index 8f66b6233..3cb66d5ee 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java @@ -1,12 +1,19 @@ package eu.dnetlib.dhp.aggregation; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; -import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob; -import eu.dnetlib.dhp.model.mdstore.MetadataRecord; -import eu.dnetlib.dhp.transformation.TransformSparkJobNode; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.MDSTORE_DATA_PATH; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -26,22 +33,17 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileOutputStream; -import java.io.FileReader; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import com.fasterxml.jackson.databind.ObjectMapper; -import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.MDSTORE_DATA_PATH; -import static org.junit.jupiter.api.Assertions.assertEquals; +import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; +import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob; +import eu.dnetlib.dhp.model.mdstore.MetadataRecord; +import eu.dnetlib.dhp.transformation.TransformSparkJobNode; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @ExtendWith(MockitoExtension.class) -public class AggregationJobTest extends AbstractVocabularyTest{ +public class AggregationJobTest extends AbstractVocabularyTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -58,8 +60,6 @@ public class AggregationJobTest extends AbstractVocabularyTest{ private static final Logger log = LoggerFactory.getLogger(AggregationJobTest.class); - - @BeforeAll public static void beforeAll() throws IOException { provenance = IOUtils @@ -86,8 +86,6 @@ public class AggregationJobTest extends AbstractVocabularyTest{ .getOrCreate(); } - - @AfterAll public static void afterAll() throws IOException { FileUtils.deleteDirectory(workingDir.toFile()); @@ -161,36 +159,42 @@ public class AggregationJobTest extends AbstractVocabularyTest{ MDStoreVersion mdStoreV2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json"); MDStoreVersion mdStoreCleanedVersion = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json"); - mockupTrasformationRule("simpleTRule", "/eu/dnetlib/dhp/transform/ext_simple.xsl"); final Map parameters = Stream.of(new String[][] { - { - "dateOfTransformation", "1234" - }, - { - "transformationPlugin", "XSLT_TRANSFORM" - }, - { - "transformationRuleId", "simpleTRule" - }, + { + "dateOfTransformation", "1234" + }, + { + "transformationPlugin", "XSLT_TRANSFORM" + }, + { + "transformationRuleId", "simpleTRule" + }, }).collect(Collectors.toMap(data -> data[0], data -> data[1])); - TransformSparkJobNode.transformRecords(parameters, isLookUpService, spark, mdStoreV2.getHdfsPath()+MDSTORE_DATA_PATH, mdStoreCleanedVersion.getHdfsPath()); + TransformSparkJobNode + .transformRecords( + parameters, isLookUpService, spark, mdStoreV2.getHdfsPath() + MDSTORE_DATA_PATH, + mdStoreCleanedVersion.getHdfsPath()); final Encoder encoder = Encoders.bean(MetadataRecord.class); - final Dataset mOutput = spark.read().format("parquet").load(mdStoreCleanedVersion.getHdfsPath()+MDSTORE_DATA_PATH).as(encoder); + final Dataset mOutput = spark + .read() + .format("parquet") + .load(mdStoreCleanedVersion.getHdfsPath() + MDSTORE_DATA_PATH) + .as(encoder); final Long total = mOutput.count(); final long recordTs = mOutput - .filter((FilterFunction) p -> p.getDateOfTransformation() == 1234) - .count(); + .filter((FilterFunction) p -> p.getDateOfTransformation() == 1234) + .count(); final long recordNotEmpty = mOutput - .filter((FilterFunction) p -> !StringUtils.isBlank(p.getBody())) - .count(); + .filter((FilterFunction) p -> !StringUtils.isBlank(p.getBody())) + .count(); assertEquals(total, recordTs); diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java index 648d7c8a1..9d6dacf0c 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java @@ -1,12 +1,18 @@ package eu.dnetlib.dhp.transformation; -import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest; -import eu.dnetlib.dhp.aggregation.common.AggregationCounter; -import eu.dnetlib.dhp.collection.CollectionJobTest; -import eu.dnetlib.dhp.model.mdstore.MetadataRecord; -import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.MDSTORE_DATA_PATH; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.lenient; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; @@ -21,17 +27,12 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; import org.mockito.junit.jupiter.MockitoExtension; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Collections; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.MDSTORE_DATA_PATH; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.lenient; +import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest; +import eu.dnetlib.dhp.aggregation.common.AggregationCounter; +import eu.dnetlib.dhp.collection.CollectionJobTest; +import eu.dnetlib.dhp.model.mdstore.MetadataRecord; +import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; @ExtendWith(MockitoExtension.class) public class TransformationJobTest extends AbstractVocabularyTest { @@ -46,7 +47,6 @@ public class TransformationJobTest extends AbstractVocabularyTest { spark = SparkSession.builder().config(conf).getOrCreate(); } - @BeforeEach public void setUp() throws IOException, ISLookUpException { setUpVocabulary(); @@ -101,7 +101,11 @@ public class TransformationJobTest extends AbstractVocabularyTest { // TODO introduce useful assertions final Encoder encoder = Encoders.bean(MetadataRecord.class); - final Dataset mOutput = spark.read().format("parquet").load(mdstore_output+MDSTORE_DATA_PATH).as(encoder); + final Dataset mOutput = spark + .read() + .format("parquet") + .load(mdstore_output + MDSTORE_DATA_PATH) + .as(encoder); final Long total = mOutput.count(); @@ -131,13 +135,10 @@ public class TransformationJobTest extends AbstractVocabularyTest { Files.deleteIfExists(tempDirWithPrefix); } - - private XSLTTransformationFunction loadTransformationRule(final String path) throws Exception { final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path)); final LongAccumulator la = new LongAccumulator(); return new XSLTTransformationFunction(new AggregationCounter(la, la, la), trValue, 0, vocabularies); } - }