From 6c805b078414b9fe3830ea4a983f3c41f30572ef Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 28 Sep 2022 14:15:07 +0200 Subject: [PATCH 1/2] [metadata transformation] separate the XSLT creation errors from the XSLT application, let the 1st propagate --- .../transformation/TransformSparkJobNode.java | 10 +-- .../transformation/TransformationFactory.java | 6 +- .../xslt/XSLTTransformationFunction.java | 78 +++++++++++-------- .../transformation/TransformationJobTest.java | 19 ++++- 4 files changed, 69 insertions(+), 44 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java index ed867c7f2..861c343c8 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java @@ -110,9 +110,6 @@ public class TransformSparkJobNode { final String workflowId = args.get("workflowId"); log.info("workflowId is {}", workflowId); - MapFunction x = TransformationFactory - .getTransformationPlugin(args, ct, isLookUpService); - final Dataset inputMDStore = spark .read() .format("parquet") @@ -124,10 +121,13 @@ public class TransformSparkJobNode { final MessageSender messageSender = new MessageSender(dnetMessageManagerURL, workflowId); try (AggregatorReport report = new AggregatorReport(messageSender)) { try { + final MapFunction tr = TransformationFactory + .getTransformationPlugin(args, ct, report, isLookUpService); + JavaRDD mdstore = inputMDStore .javaRDD() .repartition(getRepartitionNumber(totalInput, rpt)) - .map((Function) x::call) + .map((Function) tr::call) .filter((Function) Objects::nonNull); saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH); @@ -141,7 +141,7 @@ public class TransformSparkJobNode { "" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH); } catch (Throwable e) { log.error("error during record transformation", e); - report.put(TransformSparkJobNode.class.getSimpleName(), e.getMessage()); + report.put(e.getClass().getName(), e.getMessage()); report.put(CONTENT_TOTALITEMS, ct.getTotalItems().value().toString()); report.put(CONTENT_INVALIDRECORDS, ct.getErrorItems().value().toString()); report.put(CONTENT_TRANSFORMEDRECORDS, ct.getProcessedItems().value().toString()); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java index e93f3b518..1d783c145 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java @@ -10,6 +10,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.aggregation.common.AggregationCounter; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.mdstore.MetadataRecord; import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction; @@ -27,7 +28,8 @@ public class TransformationFactory { } public static MapFunction getTransformationPlugin( - final Map jobArgument, final AggregationCounter counters, final ISLookUpService isLookupService) + final Map jobArgument, final AggregationCounter counters, final AggregatorReport report, + final ISLookUpService isLookupService) throws DnetTransformationException { try { @@ -45,7 +47,7 @@ public class TransformationFactory { transformationRuleId, isLookupService); final long dateOfTransformation = Long.parseLong(jobArgument.get("dateOfTransformation")); - return new XSLTTransformationFunction(counters, transformationRule, dateOfTransformation, + return new XSLTTransformationFunction(counters, report, transformationRule, dateOfTransformation, vocabularies); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java index 54192a7be..d16310cff 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java @@ -7,10 +7,13 @@ import java.nio.charset.StandardCharsets; import javax.xml.transform.stream.StreamSource; +import org.apache.avro.test.specialtypes.value; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.spark.api.java.function.MapFunction; import eu.dnetlib.dhp.aggregation.common.AggregationCounter; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.mdstore.MetadataRecord; import net.sf.saxon.s9api.*; @@ -25,9 +28,9 @@ public class XSLTTransformationFunction implements MapFunction Date: Thu, 29 Sep 2022 11:05:43 +0200 Subject: [PATCH 2/2] WIP: error handling during XSLT transformation --- .../transformation/TransformSparkJobNode.java | 41 ++++++++----------- .../xslt/XSLTTransformationFunction.java | 11 ++++- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java index 861c343c8..d215dcfed 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java @@ -77,11 +77,11 @@ public class TransformSparkJobNode { .ofNullable(parser.get("recordsPerTask")) .map(Integer::valueOf) .orElse(RECORDS_PER_TASK); + log.info("recordsPerTask: {}", rpt); final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService); - log.info("Retrieved {} vocabularies", vocabularies.vocabularyNames().size()); SparkConf conf = new SparkConf(); @@ -120,33 +120,24 @@ public class TransformSparkJobNode { final MessageSender messageSender = new MessageSender(dnetMessageManagerURL, workflowId); try (AggregatorReport report = new AggregatorReport(messageSender)) { - try { - final MapFunction tr = TransformationFactory - .getTransformationPlugin(args, ct, report, isLookUpService); + final MapFunction tr = TransformationFactory + .getTransformationPlugin(args, ct, report, isLookUpService); - JavaRDD mdstore = inputMDStore - .javaRDD() - .repartition(getRepartitionNumber(totalInput, rpt)) - .map((Function) tr::call) - .filter((Function) Objects::nonNull); - saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH); + JavaRDD mdstore = inputMDStore + .javaRDD() + .repartition(getRepartitionNumber(totalInput, rpt)) + .map((Function) tr::call) + .filter((Function) Objects::nonNull); + saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH); - log.info("Transformed item {}", ct.getProcessedItems().count()); - log.info("Total item {}", ct.getTotalItems().count()); - log.info("Transformation Error item {}", ct.getErrorItems().count()); + log.info("Transformed item {}", ct.getProcessedItems().count()); + log.info("Total item {}", ct.getTotalItems().count()); + log.info("Transformation Error item {}", ct.getErrorItems().count()); - final long mdStoreSize = spark.read().load(outputBasePath + MDSTORE_DATA_PATH).count(); - writeHdfsFile( - spark.sparkContext().hadoopConfiguration(), - "" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH); - } catch (Throwable e) { - log.error("error during record transformation", e); - report.put(e.getClass().getName(), e.getMessage()); - report.put(CONTENT_TOTALITEMS, ct.getTotalItems().value().toString()); - report.put(CONTENT_INVALIDRECORDS, ct.getErrorItems().value().toString()); - report.put(CONTENT_TRANSFORMEDRECORDS, ct.getProcessedItems().value().toString()); - throw e; - } + final long mdStoreSize = spark.read().load(outputBasePath + MDSTORE_DATA_PATH).count(); + writeHdfsFile( + spark.sparkContext().hadoopConfiguration(), + "" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java index d16310cff..ad288b347 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java @@ -1,6 +1,9 @@ package eu.dnetlib.dhp.transformation.xslt; +import static eu.dnetlib.dhp.common.Constants.*; + +import java.io.IOException; import java.io.Serializable; import java.io.StringWriter; import java.nio.charset.StandardCharsets; @@ -70,7 +73,13 @@ public class XSLTTransformationFunction implements MapFunction