1
0
Fork 0

better logging, WIP: collectorWorker error reporting

This commit is contained in:
Claudio Atzori 2021-02-04 14:06:02 +01:00
parent 69c253710b
commit 40764cf626
8 changed files with 125 additions and 117 deletions

View File

@ -1,14 +1,12 @@
package eu.dnetlib.dhp.application; package eu.dnetlib.dhp.application;
import java.io.File; import java.io.*;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.Properties; import java.util.Properties;
public class ApplicationUtils { public class ApplicationUtils {
public static void populateOOZIEEnv(final String paramName, String value) throws Exception { public static void populateOOZIEEnv(final String paramName, String value) throws IOException {
File file = new File(System.getProperty("oozie.action.output.properties")); File file = new File(System.getProperty("oozie.action.output.properties"));
Properties props = new Properties(); Properties props = new Properties();

View File

@ -1,10 +1,7 @@
package eu.dnetlib.dhp.application; package eu.dnetlib.dhp.application;
import java.io.ByteArrayInputStream; import java.io.*;
import java.io.ByteArrayOutputStream;
import java.io.Serializable;
import java.io.StringWriter;
import java.util.*; import java.util.*;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
@ -12,17 +9,21 @@ import java.util.zip.GZIPOutputStream;
import org.apache.commons.cli.*; import org.apache.commons.cli.*;
import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
public class ArgumentApplicationParser implements Serializable { public class ArgumentApplicationParser implements Serializable {
private static final Logger log = LoggerFactory.getLogger(ArgumentApplicationParser.class);
private final Options options = new Options(); private final Options options = new Options();
private final Map<String, String> objectMap = new HashMap<>(); private final Map<String, String> objectMap = new HashMap<>();
private final List<String> compressedValues = new ArrayList<>(); private final List<String> compressedValues = new ArrayList<>();
public ArgumentApplicationParser(final String json_configuration) throws Exception { public ArgumentApplicationParser(final String json_configuration) throws IOException {
final ObjectMapper mapper = new ObjectMapper(); final ObjectMapper mapper = new ObjectMapper();
final OptionsParameter[] configuration = mapper.readValue(json_configuration, OptionsParameter[].class); final OptionsParameter[] configuration = mapper.readValue(json_configuration, OptionsParameter[].class);
createOptionMap(configuration); createOptionMap(configuration);
@ -33,7 +34,6 @@ public class ArgumentApplicationParser implements Serializable {
} }
private void createOptionMap(final OptionsParameter[] configuration) { private void createOptionMap(final OptionsParameter[] configuration) {
Arrays Arrays
.stream(configuration) .stream(configuration)
.map( .map(
@ -47,10 +47,6 @@ public class ArgumentApplicationParser implements Serializable {
return o; return o;
}) })
.forEach(options::addOption); .forEach(options::addOption);
// HelpFormatter formatter = new HelpFormatter();
// formatter.printHelp("myapp", null, options, null, true);
} }
public static String decompressValue(final String abstractCompressed) { public static String decompressValue(final String abstractCompressed) {
@ -61,7 +57,7 @@ public class ArgumentApplicationParser implements Serializable {
IOUtils.copy(gis, stringWriter); IOUtils.copy(gis, stringWriter);
return stringWriter.toString(); return stringWriter.toString();
} catch (Throwable e) { } catch (Throwable e) {
System.out.println("Wrong value to decompress:" + abstractCompressed); log.error("Wrong value to decompress:" + abstractCompressed);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
@ -74,7 +70,7 @@ public class ArgumentApplicationParser implements Serializable {
return java.util.Base64.getEncoder().encodeToString(out.toByteArray()); return java.util.Base64.getEncoder().encodeToString(out.toByteArray());
} }
public void parseArgument(final String[] args) throws Exception { public void parseArgument(final String[] args) throws ParseException {
CommandLineParser parser = new BasicParser(); CommandLineParser parser = new BasicParser();
CommandLine cmd = parser.parse(options, args); CommandLine cmd = parser.parse(options, args);
Arrays Arrays

View File

@ -113,6 +113,7 @@ public class OaiIterator implements Iterator<String> {
return downloadPage(url); return downloadPage(url);
} catch (final UnsupportedEncodingException e) { } catch (final UnsupportedEncodingException e) {
errorLogList.add(e.getMessage());
throw new CollectorException(e); throw new CollectorException(e);
} }
} }
@ -138,6 +139,7 @@ public class OaiIterator implements Iterator<String> {
+ "?verb=ListRecords&resumptionToken=" + "?verb=ListRecords&resumptionToken="
+ URLEncoder.encode(resumptionToken, "UTF-8")); + URLEncoder.encode(resumptionToken, "UTF-8"));
} catch (final UnsupportedEncodingException e) { } catch (final UnsupportedEncodingException e) {
errorLogList.add(e.getMessage());
throw new CollectorException(e); throw new CollectorException(e);
} }
} }
@ -150,12 +152,14 @@ public class OaiIterator implements Iterator<String> {
doc = reader.read(new StringReader(xml)); doc = reader.read(new StringReader(xml));
} catch (final DocumentException e) { } catch (final DocumentException e) {
log.warn("Error parsing xml, I try to clean it. {}", e.getMessage()); log.warn("Error parsing xml, I try to clean it. {}", e.getMessage());
errorLogList.add(e.getMessage());
final String cleaned = XmlCleaner.cleanAllEntities(xml); final String cleaned = XmlCleaner.cleanAllEntities(xml);
try { try {
doc = reader.read(new StringReader(cleaned)); doc = reader.read(new StringReader(cleaned));
} catch (final DocumentException e1) { } catch (final DocumentException e1) {
final String resumptionToken = extractResumptionToken(xml); final String resumptionToken = extractResumptionToken(xml);
if (resumptionToken == null) { if (resumptionToken == null) {
errorLogList.add(e1.getMessage());
throw new CollectorException("Error parsing cleaned document:\n" + cleaned, e1); throw new CollectorException("Error parsing cleaned document:\n" + cleaned, e1);
} }
return resumptionToken; return resumptionToken;
@ -166,10 +170,14 @@ public class OaiIterator implements Iterator<String> {
if (errorNode != null) { if (errorNode != null) {
final String code = errorNode.valueOf("@code"); final String code = errorNode.valueOf("@code");
if ("noRecordsMatch".equalsIgnoreCase(code.trim())) { if ("noRecordsMatch".equalsIgnoreCase(code.trim())) {
log.warn("noRecordsMatch for oai call: " + url); final String msg = "noRecordsMatch for oai call : " + url;
log.warn(msg);
errorLogList.add(msg);
return null; return null;
} else { } else {
throw new CollectorException(code + " - " + errorNode.getText()); final String msg = code + " - " + errorNode.getText();
errorLogList.add(msg);
throw new CollectorException(msg);
} }
} }

View File

@ -29,16 +29,13 @@ public class CollectorWorker {
private final String hdfsPath; private final String hdfsPath;
private CollectorPlugin plugin;
public CollectorWorker( public CollectorWorker(
final ApiDescriptor api, final ApiDescriptor api,
final String hdfsuri, final String hdfsuri,
final String hdfsPath) throws CollectorException { final String hdfsPath) {
this.api = api; this.api = api;
this.hdfsuri = hdfsuri; this.hdfsuri = hdfsuri;
this.hdfsPath = hdfsPath; this.hdfsPath = hdfsPath;
this.plugin = CollectorPluginFactory.getPluginByProtocol(api.getProtocol());
} }
public CollectorPluginErrorLogList collect() throws IOException, CollectorException { public CollectorPluginErrorLogList collect() throws IOException, CollectorException {
@ -59,6 +56,7 @@ public class CollectorWorker {
log.info("Created path " + hdfswritepath.toString()); log.info("Created path " + hdfswritepath.toString());
final CollectorPlugin plugin = CollectorPluginFactory.getPluginByProtocol(api.getProtocol());
final AtomicInteger counter = new AtomicInteger(0); final AtomicInteger counter = new AtomicInteger(0);
try (SequenceFile.Writer writer = SequenceFile try (SequenceFile.Writer writer = SequenceFile
.createWriter( .createWriter(

View File

@ -5,6 +5,9 @@ import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*;
import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*;
import static eu.dnetlib.dhp.application.ApplicationUtils.*; import static eu.dnetlib.dhp.application.ApplicationUtils.*;
import java.io.IOException;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -35,7 +38,7 @@ public class CollectorWorkerApplication {
/** /**
* @param args * @param args
*/ */
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws ParseException, IOException, CollectorException {
final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser( final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser(
IOUtils IOUtils

View File

@ -1,18 +1,20 @@
package eu.dnetlib.dhp.aggregation; package eu.dnetlib.dhp.aggregation;
import static org.mockito.Mockito.lenient;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.mockito.Mock;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.transformation.TransformationFactory; import eu.dnetlib.dhp.transformation.TransformationFactory;
import eu.dnetlib.dhp.transformation.TransformationJobTest; import eu.dnetlib.dhp.transformation.TransformationJobTest;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.io.IOUtils;
import org.mockito.Mock;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import static org.mockito.Mockito.lenient;
public abstract class AbstractVocabularyTest { public abstract class AbstractVocabularyTest {
@ -21,8 +23,6 @@ public abstract class AbstractVocabularyTest {
protected VocabularyGroup vocabularies; protected VocabularyGroup vocabularies;
public void setUpVocabulary() throws ISLookUpException, IOException { public void setUpVocabulary() throws ISLookUpException, IOException {
lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs()); lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs());

View File

@ -1,12 +1,19 @@
package eu.dnetlib.dhp.aggregation; package eu.dnetlib.dhp.aggregation;
import com.fasterxml.jackson.databind.ObjectMapper; import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.MDSTORE_DATA_PATH;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; import static org.junit.jupiter.api.Assertions.assertEquals;
import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import java.io.File;
import eu.dnetlib.dhp.transformation.TransformSparkJobNode; import java.io.FileOutputStream;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -26,18 +33,13 @@ import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File; import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.MDSTORE_DATA_PATH; import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import static org.junit.jupiter.api.Assertions.assertEquals; import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.TransformSparkJobNode;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
@ -58,8 +60,6 @@ public class AggregationJobTest extends AbstractVocabularyTest{
private static final Logger log = LoggerFactory.getLogger(AggregationJobTest.class); private static final Logger log = LoggerFactory.getLogger(AggregationJobTest.class);
@BeforeAll @BeforeAll
public static void beforeAll() throws IOException { public static void beforeAll() throws IOException {
provenance = IOUtils provenance = IOUtils
@ -86,8 +86,6 @@ public class AggregationJobTest extends AbstractVocabularyTest{
.getOrCreate(); .getOrCreate();
} }
@AfterAll @AfterAll
public static void afterAll() throws IOException { public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile()); FileUtils.deleteDirectory(workingDir.toFile());
@ -161,7 +159,6 @@ public class AggregationJobTest extends AbstractVocabularyTest{
MDStoreVersion mdStoreV2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json"); MDStoreVersion mdStoreV2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json");
MDStoreVersion mdStoreCleanedVersion = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json"); MDStoreVersion mdStoreCleanedVersion = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json");
mockupTrasformationRule("simpleTRule", "/eu/dnetlib/dhp/transform/ext_simple.xsl"); mockupTrasformationRule("simpleTRule", "/eu/dnetlib/dhp/transform/ext_simple.xsl");
final Map<String, String> parameters = Stream.of(new String[][] { final Map<String, String> parameters = Stream.of(new String[][] {
@ -177,10 +174,17 @@ public class AggregationJobTest extends AbstractVocabularyTest{
}).collect(Collectors.toMap(data -> data[0], data -> data[1])); }).collect(Collectors.toMap(data -> data[0], data -> data[1]));
TransformSparkJobNode.transformRecords(parameters, isLookUpService, spark, mdStoreV2.getHdfsPath()+MDSTORE_DATA_PATH, mdStoreCleanedVersion.getHdfsPath()); TransformSparkJobNode
.transformRecords(
parameters, isLookUpService, spark, mdStoreV2.getHdfsPath() + MDSTORE_DATA_PATH,
mdStoreCleanedVersion.getHdfsPath());
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class); final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mOutput = spark.read().format("parquet").load(mdStoreCleanedVersion.getHdfsPath()+MDSTORE_DATA_PATH).as(encoder); final Dataset<MetadataRecord> mOutput = spark
.read()
.format("parquet")
.load(mdStoreCleanedVersion.getHdfsPath() + MDSTORE_DATA_PATH)
.as(encoder);
final Long total = mOutput.count(); final Long total = mOutput.count();

View File

@ -1,12 +1,18 @@
package eu.dnetlib.dhp.transformation; package eu.dnetlib.dhp.transformation;
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest; import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.MDSTORE_DATA_PATH;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter; import static org.junit.jupiter.api.Assertions.assertEquals;
import eu.dnetlib.dhp.collection.CollectionJobTest; import static org.mockito.Mockito.lenient;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction; import java.io.IOException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -21,17 +27,12 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.api.io.TempDir;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import java.io.IOException; import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest;
import java.nio.file.Files; import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import java.nio.file.Path; import eu.dnetlib.dhp.collection.CollectionJobTest;
import java.util.Collections; import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import java.util.Map; import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
import java.util.stream.Collectors; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import java.util.stream.Stream;
import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.MDSTORE_DATA_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class TransformationJobTest extends AbstractVocabularyTest { public class TransformationJobTest extends AbstractVocabularyTest {
@ -46,7 +47,6 @@ public class TransformationJobTest extends AbstractVocabularyTest {
spark = SparkSession.builder().config(conf).getOrCreate(); spark = SparkSession.builder().config(conf).getOrCreate();
} }
@BeforeEach @BeforeEach
public void setUp() throws IOException, ISLookUpException { public void setUp() throws IOException, ISLookUpException {
setUpVocabulary(); setUpVocabulary();
@ -101,7 +101,11 @@ public class TransformationJobTest extends AbstractVocabularyTest {
// TODO introduce useful assertions // TODO introduce useful assertions
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class); final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mOutput = spark.read().format("parquet").load(mdstore_output+MDSTORE_DATA_PATH).as(encoder); final Dataset<MetadataRecord> mOutput = spark
.read()
.format("parquet")
.load(mdstore_output + MDSTORE_DATA_PATH)
.as(encoder);
final Long total = mOutput.count(); final Long total = mOutput.count();
@ -131,13 +135,10 @@ public class TransformationJobTest extends AbstractVocabularyTest {
Files.deleteIfExists(tempDirWithPrefix); Files.deleteIfExists(tempDirWithPrefix);
} }
private XSLTTransformationFunction loadTransformationRule(final String path) throws Exception { private XSLTTransformationFunction loadTransformationRule(final String path) throws Exception {
final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path)); final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path));
final LongAccumulator la = new LongAccumulator(); final LongAccumulator la = new LongAccumulator();
return new XSLTTransformationFunction(new AggregationCounter(la, la, la), trValue, 0, vocabularies); return new XSLTTransformationFunction(new AggregationCounter(la, la, la), trValue, 0, vocabularies);
} }
} }