From 58467aaf1eef7043688cf49d2c5cc1ffd745dca3 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 17 Feb 2021 16:14:41 +0100 Subject: [PATCH] WIP: transformation workflow error reporting --- .../aggregation/common/AggregatorReport.java | 2 +- .../transformation/TransformSparkJobNode.java | 54 +++++++++++++------ .../dhp/transformation/oozie_app/workflow.xml | 10 ++++ .../transformation_input_parameters.json | 15 ++++-- 4 files changed, 61 insertions(+), 20 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregatorReport.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregatorReport.java index 9f91c4247..c822a6723 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregatorReport.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregatorReport.java @@ -8,13 +8,13 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; -import eu.dnetlib.dhp.utils.DHPUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.gson.Gson; import eu.dnetlib.dhp.message.MessageSender; +import eu.dnetlib.dhp.utils.DHPUtils; public class AggregatorReport extends LinkedHashMap implements Closeable { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java index f9a18987d..0b3de6490 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java @@ -21,11 +21,14 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; import eu.dnetlib.dhp.aggregation.common.AggregationCounter; +import eu.dnetlib.dhp.aggregation.common.AggregatorReport; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.message.MessageSender; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import parquet.hadoop.ParquetReader; public class TransformSparkJobNode { @@ -54,7 +57,7 @@ public class TransformSparkJobNode { final MDStoreVersion nativeMdStoreVersion = MAPPER.readValue(mdstoreInputVersion, MDStoreVersion.class); final String inputPath = nativeMdStoreVersion.getHdfsPath() + MDSTORE_DATA_PATH; log.info("inputPath: {}", inputPath); - + ParquetReader final MDStoreVersion cleanedMdStoreVersion = MAPPER.readValue(mdstoreOutputVersion, MDStoreVersion.class); final String outputBasePath = cleanedMdStoreVersion.getHdfsPath(); log.info("outputBasePath: {}", outputBasePath); @@ -91,23 +94,42 @@ public class TransformSparkJobNode { final AggregationCounter ct = new AggregationCounter(totalItems, errorItems, transformedItems); final Encoder encoder = Encoders.bean(MetadataRecord.class); - final Dataset mdstore = spark - .read() - .format("parquet") - .load(inputPath) - .as(encoder) - .map( - TransformationFactory.getTransformationPlugin(args, ct, isLookUpService), - encoder); - saveDataset(mdstore, outputBasePath + MDSTORE_DATA_PATH); + final String dnetMessageManagerURL = args.get(DNET_MESSAGE_MGR_URL); + log.info("dnetMessageManagerURL is {}", dnetMessageManagerURL); - log.info("Transformed item " + ct.getProcessedItems().count()); - log.info("Total item " + ct.getTotalItems().count()); - log.info("Transformation Error item " + ct.getErrorItems().count()); + final String workflowId = args.get("workflowId"); + log.info("workflowId is {}", workflowId); - writeHdfsFile( - spark.sparkContext().hadoopConfiguration(), - "" + spark.read().load(outputBasePath + MDSTORE_DATA_PATH).count(), outputBasePath + MDSTORE_SIZE_PATH); + final MessageSender messageSender = new MessageSender(dnetMessageManagerURL, workflowId); + try (AggregatorReport report = new AggregatorReport(messageSender)) { + try { + final Dataset mdstore = spark + .read() + .format("parquet") + .load(inputPath) + .as(encoder) + .map( + TransformationFactory.getTransformationPlugin(args, ct, isLookUpService), + encoder); + saveDataset(mdstore, outputBasePath + MDSTORE_DATA_PATH); + + log.info("Transformed item " + ct.getProcessedItems().count()); + log.info("Total item " + ct.getTotalItems().count()); + log.info("Transformation Error item " + ct.getErrorItems().count()); + + final long mdStoreSize = spark.read().load(outputBasePath + MDSTORE_DATA_PATH).count(); + writeHdfsFile( + spark.sparkContext().hadoopConfiguration(), + "" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH); + } catch (Throwable e) { + log.error("error during record transformation", e); + report.put(TransformSparkJobNode.class.getSimpleName(), e.getMessage()); + report.put(CONTENT_TOTALITEMS, ct.getTotalItems().value().toString()); + report.put(CONTENT_INVALIDRECORDS, ct.getErrorItems().value().toString()); + report.put(CONTENT_TRANSFORMEDRECORDS, ct.getProcessedItems().value().toString()); + throw e; + } + } } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/workflow.xml index 9e01936d4..61e5710fa 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/workflow.xml @@ -29,6 +29,14 @@ isLookupUrl The IS lookUp service endopoint + + workflowId + The identifier of the workflow + + + dnetMessageManagerURL + The URI of the Dnet Message Manager + @@ -95,6 +103,8 @@ --transformationPlugin${transformationPlugin} --transformationRuleId${transformationRuleId} --isLookupUrl${isLookupUrl} + --workflowId${workflowId} + --dnetMessageManagerURL${dnetMessageManagerURL} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json index d92698de5..ee9099dde 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json @@ -36,9 +36,18 @@ "paramDescription": "the Information System Service LookUp URL", "paramRequired": true }, - - - + { + "paramName": "dm", + "paramLongName": "dnetMessageManagerURL", + "paramDescription": "the End point URL to send Messages", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workflowId", + "paramDescription": "the identifier of the dnet Workflow", + "paramRequired": true + }, { "paramName": "tp", "paramLongName": "transformationPlugin",