diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java index 861c343c8..d215dcfed 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java @@ -77,11 +77,11 @@ public class TransformSparkJobNode { .ofNullable(parser.get("recordsPerTask")) .map(Integer::valueOf) .orElse(RECORDS_PER_TASK); + log.info("recordsPerTask: {}", rpt); final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService); - log.info("Retrieved {} vocabularies", vocabularies.vocabularyNames().size()); SparkConf conf = new SparkConf(); @@ -120,33 +120,24 @@ public class TransformSparkJobNode { final MessageSender messageSender = new MessageSender(dnetMessageManagerURL, workflowId); try (AggregatorReport report = new AggregatorReport(messageSender)) { - try { - final MapFunction tr = TransformationFactory - .getTransformationPlugin(args, ct, report, isLookUpService); + final MapFunction tr = TransformationFactory + .getTransformationPlugin(args, ct, report, isLookUpService); - JavaRDD mdstore = inputMDStore - .javaRDD() - .repartition(getRepartitionNumber(totalInput, rpt)) - .map((Function) tr::call) - .filter((Function) Objects::nonNull); - saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH); + JavaRDD mdstore = inputMDStore + .javaRDD() + .repartition(getRepartitionNumber(totalInput, rpt)) + .map((Function) tr::call) + .filter((Function) Objects::nonNull); + saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH); - log.info("Transformed item {}", ct.getProcessedItems().count()); - log.info("Total item {}", ct.getTotalItems().count()); - log.info("Transformation Error item {}", ct.getErrorItems().count()); + log.info("Transformed item {}", ct.getProcessedItems().count()); + log.info("Total item {}", ct.getTotalItems().count()); + log.info("Transformation Error item {}", ct.getErrorItems().count()); - final long mdStoreSize = spark.read().load(outputBasePath + MDSTORE_DATA_PATH).count(); - writeHdfsFile( - spark.sparkContext().hadoopConfiguration(), - "" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH); - } catch (Throwable e) { - log.error("error during record transformation", e); - report.put(e.getClass().getName(), e.getMessage()); - report.put(CONTENT_TOTALITEMS, ct.getTotalItems().value().toString()); - report.put(CONTENT_INVALIDRECORDS, ct.getErrorItems().value().toString()); - report.put(CONTENT_TRANSFORMEDRECORDS, ct.getProcessedItems().value().toString()); - throw e; - } + final long mdStoreSize = spark.read().load(outputBasePath + MDSTORE_DATA_PATH).count(); + writeHdfsFile( + spark.sparkContext().hadoopConfiguration(), + "" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java index d16310cff..ad288b347 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java @@ -1,6 +1,9 @@ package eu.dnetlib.dhp.transformation.xslt; +import static eu.dnetlib.dhp.common.Constants.*; + +import java.io.IOException; import java.io.Serializable; import java.io.StringWriter; import java.nio.charset.StandardCharsets; @@ -70,7 +73,13 @@ public class XSLTTransformationFunction implements MapFunction