From 6c805b078414b9fe3830ea4a983f3c41f30572ef Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 28 Sep 2022 14:15:07 +0200 Subject: [PATCH] [metadata transformation] separate the XSLT creation errors from the XSLT application, let the 1st propagate --- .../transformation/TransformSparkJobNode.java | 10 +-- .../transformation/TransformationFactory.java | 6 +- .../xslt/XSLTTransformationFunction.java | 78 +++++++++++-------- .../transformation/TransformationJobTest.java | 19 ++++- 4 files changed, 69 insertions(+), 44 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java index ed867c7f2..861c343c8 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java @@ -110,9 +110,6 @@ public class TransformSparkJobNode { final String workflowId = args.get("workflowId"); log.info("workflowId is {}", workflowId); - MapFunction x = TransformationFactory - .getTransformationPlugin(args, ct, isLookUpService); - final Dataset inputMDStore = spark .read() .format("parquet") @@ -124,10 +121,13 @@ public class TransformSparkJobNode { final MessageSender messageSender = new MessageSender(dnetMessageManagerURL, workflowId); try (AggregatorReport report = new AggregatorReport(messageSender)) { try { + final MapFunction tr = TransformationFactory + .getTransformationPlugin(args, ct, report, isLookUpService); + JavaRDD mdstore = inputMDStore .javaRDD() .repartition(getRepartitionNumber(totalInput, rpt)) - .map((Function) x::call) + .map((Function) tr::call) .filter((Function) Objects::nonNull); saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH); @@ -141,7 +141,7 @@ public class TransformSparkJobNode { "" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH); } catch (Throwable e) { log.error("error during record transformation", e); - report.put(TransformSparkJobNode.class.getSimpleName(), e.getMessage()); + report.put(e.getClass().getName(), e.getMessage()); report.put(CONTENT_TOTALITEMS, ct.getTotalItems().value().toString()); report.put(CONTENT_INVALIDRECORDS, ct.getErrorItems().value().toString()); report.put(CONTENT_TRANSFORMEDRECORDS, ct.getProcessedItems().value().toString()); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java index e93f3b518..1d783c145 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java @@ -10,6 +10,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.aggregation.common.AggregationCounter; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.mdstore.MetadataRecord; import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction; @@ -27,7 +28,8 @@ public class TransformationFactory { } public static MapFunction getTransformationPlugin( - final Map jobArgument, final AggregationCounter counters, final ISLookUpService isLookupService) + final Map jobArgument, final AggregationCounter counters, final AggregatorReport report, + final ISLookUpService isLookupService) throws DnetTransformationException { try { @@ -45,7 +47,7 @@ public class TransformationFactory { transformationRuleId, isLookupService); final long dateOfTransformation = Long.parseLong(jobArgument.get("dateOfTransformation")); - return new XSLTTransformationFunction(counters, transformationRule, dateOfTransformation, + return new XSLTTransformationFunction(counters, report, transformationRule, dateOfTransformation, vocabularies); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java index 54192a7be..d16310cff 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java @@ -7,10 +7,13 @@ import java.nio.charset.StandardCharsets; import javax.xml.transform.stream.StreamSource; +import org.apache.avro.test.specialtypes.value; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.spark.api.java.function.MapFunction; import eu.dnetlib.dhp.aggregation.common.AggregationCounter; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.mdstore.MetadataRecord; import net.sf.saxon.s9api.*; @@ -25,9 +28,9 @@ public class XSLTTransformationFunction implements MapFunction