1
0
Fork 0

WIP: transformation workflow error reporting

This commit is contained in:
Claudio Atzori 2021-02-17 16:14:41 +01:00
parent cc88701f29
commit 58467aaf1e
4 changed files with 61 additions and 20 deletions

View File

@ -8,13 +8,13 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import eu.dnetlib.dhp.message.MessageSender;
import eu.dnetlib.dhp.utils.DHPUtils;
public class AggregatorReport extends LinkedHashMap<String, String> implements Closeable {

View File

@ -21,11 +21,14 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.aggregation.common.AggregatorReport;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.message.MessageSender;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import parquet.hadoop.ParquetReader;
public class TransformSparkJobNode {
@ -54,7 +57,7 @@ public class TransformSparkJobNode {
final MDStoreVersion nativeMdStoreVersion = MAPPER.readValue(mdstoreInputVersion, MDStoreVersion.class);
final String inputPath = nativeMdStoreVersion.getHdfsPath() + MDSTORE_DATA_PATH;
log.info("inputPath: {}", inputPath);
ParquetReader
final MDStoreVersion cleanedMdStoreVersion = MAPPER.readValue(mdstoreOutputVersion, MDStoreVersion.class);
final String outputBasePath = cleanedMdStoreVersion.getHdfsPath();
log.info("outputBasePath: {}", outputBasePath);
@ -91,23 +94,42 @@ public class TransformSparkJobNode {
final AggregationCounter ct = new AggregationCounter(totalItems, errorItems, transformedItems);
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstore = spark
.read()
.format("parquet")
.load(inputPath)
.as(encoder)
.map(
TransformationFactory.getTransformationPlugin(args, ct, isLookUpService),
encoder);
saveDataset(mdstore, outputBasePath + MDSTORE_DATA_PATH);
final String dnetMessageManagerURL = args.get(DNET_MESSAGE_MGR_URL);
log.info("dnetMessageManagerURL is {}", dnetMessageManagerURL);
log.info("Transformed item " + ct.getProcessedItems().count());
log.info("Total item " + ct.getTotalItems().count());
log.info("Transformation Error item " + ct.getErrorItems().count());
final String workflowId = args.get("workflowId");
log.info("workflowId is {}", workflowId);
writeHdfsFile(
spark.sparkContext().hadoopConfiguration(),
"" + spark.read().load(outputBasePath + MDSTORE_DATA_PATH).count(), outputBasePath + MDSTORE_SIZE_PATH);
final MessageSender messageSender = new MessageSender(dnetMessageManagerURL, workflowId);
try (AggregatorReport report = new AggregatorReport(messageSender)) {
try {
final Dataset<MetadataRecord> mdstore = spark
.read()
.format("parquet")
.load(inputPath)
.as(encoder)
.map(
TransformationFactory.getTransformationPlugin(args, ct, isLookUpService),
encoder);
saveDataset(mdstore, outputBasePath + MDSTORE_DATA_PATH);
log.info("Transformed item " + ct.getProcessedItems().count());
log.info("Total item " + ct.getTotalItems().count());
log.info("Transformation Error item " + ct.getErrorItems().count());
final long mdStoreSize = spark.read().load(outputBasePath + MDSTORE_DATA_PATH).count();
writeHdfsFile(
spark.sparkContext().hadoopConfiguration(),
"" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH);
} catch (Throwable e) {
log.error("error during record transformation", e);
report.put(TransformSparkJobNode.class.getSimpleName(), e.getMessage());
report.put(CONTENT_TOTALITEMS, ct.getTotalItems().value().toString());
report.put(CONTENT_INVALIDRECORDS, ct.getErrorItems().value().toString());
report.put(CONTENT_TRANSFORMEDRECORDS, ct.getProcessedItems().value().toString());
throw e;
}
}
}
}

View File

@ -29,6 +29,14 @@
<name>isLookupUrl</name>
<description>The IS lookUp service endopoint</description>
</property>
<property>
<name>workflowId</name>
<description>The identifier of the workflow</description>
</property>
<property>
<name>dnetMessageManagerURL</name>
<description>The URI of the Dnet Message Manager</description>
</property>
</parameters>
<start to="BeginRead"/>
@ -95,6 +103,8 @@
<arg>--transformationPlugin</arg><arg>${transformationPlugin}</arg>
<arg>--transformationRuleId</arg><arg>${transformationRuleId}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--workflowId</arg><arg>${workflowId}</arg>
<arg>--dnetMessageManagerURL</arg><arg>${dnetMessageManagerURL}</arg>
</spark>
<ok to="EndRead"/>
<error to="EndReadRollBack"/>

View File

@ -36,9 +36,18 @@
"paramDescription": "the Information System Service LookUp URL",
"paramRequired": true
},
{
"paramName": "dm",
"paramLongName": "dnetMessageManagerURL",
"paramDescription": "the End point URL to send Messages",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workflowId",
"paramDescription": "the identifier of the dnet Workflow",
"paramRequired": true
},
{
"paramName": "tp",
"paramLongName": "transformationPlugin",