forked from D-Net/dnet-hadoop
WIP: error handling during XSLT transformation
This commit is contained in:
parent
f0a9c370b6
commit
a2095dc725
|
@ -77,11 +77,11 @@ public class TransformSparkJobNode {
|
||||||
.ofNullable(parser.get("recordsPerTask"))
|
.ofNullable(parser.get("recordsPerTask"))
|
||||||
.map(Integer::valueOf)
|
.map(Integer::valueOf)
|
||||||
.orElse(RECORDS_PER_TASK);
|
.orElse(RECORDS_PER_TASK);
|
||||||
|
log.info("recordsPerTask: {}", rpt);
|
||||||
|
|
||||||
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||||
|
|
||||||
final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService);
|
final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService);
|
||||||
|
|
||||||
log.info("Retrieved {} vocabularies", vocabularies.vocabularyNames().size());
|
log.info("Retrieved {} vocabularies", vocabularies.vocabularyNames().size());
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
|
@ -120,33 +120,24 @@ public class TransformSparkJobNode {
|
||||||
|
|
||||||
final MessageSender messageSender = new MessageSender(dnetMessageManagerURL, workflowId);
|
final MessageSender messageSender = new MessageSender(dnetMessageManagerURL, workflowId);
|
||||||
try (AggregatorReport report = new AggregatorReport(messageSender)) {
|
try (AggregatorReport report = new AggregatorReport(messageSender)) {
|
||||||
try {
|
final MapFunction<MetadataRecord, MetadataRecord> tr = TransformationFactory
|
||||||
final MapFunction<MetadataRecord, MetadataRecord> tr = TransformationFactory
|
.getTransformationPlugin(args, ct, report, isLookUpService);
|
||||||
.getTransformationPlugin(args, ct, report, isLookUpService);
|
|
||||||
|
|
||||||
JavaRDD<MetadataRecord> mdstore = inputMDStore
|
JavaRDD<MetadataRecord> mdstore = inputMDStore
|
||||||
.javaRDD()
|
.javaRDD()
|
||||||
.repartition(getRepartitionNumber(totalInput, rpt))
|
.repartition(getRepartitionNumber(totalInput, rpt))
|
||||||
.map((Function<MetadataRecord, MetadataRecord>) tr::call)
|
.map((Function<MetadataRecord, MetadataRecord>) tr::call)
|
||||||
.filter((Function<MetadataRecord, Boolean>) Objects::nonNull);
|
.filter((Function<MetadataRecord, Boolean>) Objects::nonNull);
|
||||||
saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH);
|
saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH);
|
||||||
|
|
||||||
log.info("Transformed item {}", ct.getProcessedItems().count());
|
log.info("Transformed item {}", ct.getProcessedItems().count());
|
||||||
log.info("Total item {}", ct.getTotalItems().count());
|
log.info("Total item {}", ct.getTotalItems().count());
|
||||||
log.info("Transformation Error item {}", ct.getErrorItems().count());
|
log.info("Transformation Error item {}", ct.getErrorItems().count());
|
||||||
|
|
||||||
final long mdStoreSize = spark.read().load(outputBasePath + MDSTORE_DATA_PATH).count();
|
final long mdStoreSize = spark.read().load(outputBasePath + MDSTORE_DATA_PATH).count();
|
||||||
writeHdfsFile(
|
writeHdfsFile(
|
||||||
spark.sparkContext().hadoopConfiguration(),
|
spark.sparkContext().hadoopConfiguration(),
|
||||||
"" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH);
|
"" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH);
|
||||||
} catch (Throwable e) {
|
|
||||||
log.error("error during record transformation", e);
|
|
||||||
report.put(e.getClass().getName(), e.getMessage());
|
|
||||||
report.put(CONTENT_TOTALITEMS, ct.getTotalItems().value().toString());
|
|
||||||
report.put(CONTENT_INVALIDRECORDS, ct.getErrorItems().value().toString());
|
|
||||||
report.put(CONTENT_TRANSFORMEDRECORDS, ct.getProcessedItems().value().toString());
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.transformation.xslt;
|
package eu.dnetlib.dhp.transformation.xslt;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.Constants.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.io.StringWriter;
|
import java.io.StringWriter;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
@ -70,7 +73,13 @@ public class XSLTTransformationFunction implements MapFunction<MetadataRecord, M
|
||||||
.compile(new StreamSource(IOUtils.toInputStream(transformationRule, StandardCharsets.UTF_8)))
|
.compile(new StreamSource(IOUtils.toInputStream(transformationRule, StandardCharsets.UTF_8)))
|
||||||
.load();
|
.load();
|
||||||
} catch (SaxonApiException e) {
|
} catch (SaxonApiException e) {
|
||||||
throw new RuntimeException(e);
|
report.put(e.getClass().getName(), e.getMessage());
|
||||||
|
try {
|
||||||
|
report.close();
|
||||||
|
} catch (IOException ex) {
|
||||||
|
throw new IllegalArgumentException("error compiling the XSLT", e);
|
||||||
|
}
|
||||||
|
throw new IllegalArgumentException("error compiling the XSLT", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
transformer
|
transformer
|
||||||
|
|
Loading…
Reference in New Issue