diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java index ed867c7f2..d215dcfed 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java @@ -77,11 +77,11 @@ public class TransformSparkJobNode { .ofNullable(parser.get("recordsPerTask")) .map(Integer::valueOf) .orElse(RECORDS_PER_TASK); + log.info("recordsPerTask: {}", rpt); final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService); - log.info("Retrieved {} vocabularies", vocabularies.vocabularyNames().size()); SparkConf conf = new SparkConf(); @@ -110,9 +110,6 @@ public class TransformSparkJobNode { final String workflowId = args.get("workflowId"); log.info("workflowId is {}", workflowId); - MapFunction x = TransformationFactory - .getTransformationPlugin(args, ct, isLookUpService); - final Dataset inputMDStore = spark .read() .format("parquet") @@ -123,30 +120,24 @@ public class TransformSparkJobNode { final MessageSender messageSender = new MessageSender(dnetMessageManagerURL, workflowId); try (AggregatorReport report = new AggregatorReport(messageSender)) { - try { - JavaRDD mdstore = inputMDStore - .javaRDD() - .repartition(getRepartitionNumber(totalInput, rpt)) - .map((Function) x::call) - .filter((Function) Objects::nonNull); - saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH); + final MapFunction tr = TransformationFactory + .getTransformationPlugin(args, ct, report, isLookUpService); - log.info("Transformed item {}", ct.getProcessedItems().count()); - log.info("Total item {}", ct.getTotalItems().count()); - log.info("Transformation Error item {}", ct.getErrorItems().count()); + JavaRDD mdstore = inputMDStore + .javaRDD() + .repartition(getRepartitionNumber(totalInput, rpt)) + .map((Function) tr::call) + .filter((Function) Objects::nonNull); + saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH); - final long mdStoreSize = spark.read().load(outputBasePath + MDSTORE_DATA_PATH).count(); - writeHdfsFile( - spark.sparkContext().hadoopConfiguration(), - "" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH); - } catch (Throwable e) { - log.error("error during record transformation", e); - report.put(TransformSparkJobNode.class.getSimpleName(), e.getMessage()); - report.put(CONTENT_TOTALITEMS, ct.getTotalItems().value().toString()); - report.put(CONTENT_INVALIDRECORDS, ct.getErrorItems().value().toString()); - report.put(CONTENT_TRANSFORMEDRECORDS, ct.getProcessedItems().value().toString()); - throw e; - } + log.info("Transformed item {}", ct.getProcessedItems().count()); + log.info("Total item {}", ct.getTotalItems().count()); + log.info("Transformation Error item {}", ct.getErrorItems().count()); + + final long mdStoreSize = spark.read().load(outputBasePath + MDSTORE_DATA_PATH).count(); + writeHdfsFile( + spark.sparkContext().hadoopConfiguration(), + "" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java index e93f3b518..1d783c145 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java @@ -10,6 +10,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.aggregation.common.AggregationCounter; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.mdstore.MetadataRecord; import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction; @@ -27,7 +28,8 @@ public class TransformationFactory { } public static MapFunction getTransformationPlugin( - final Map jobArgument, final AggregationCounter counters, final ISLookUpService isLookupService) + final Map jobArgument, final AggregationCounter counters, final AggregatorReport report, + final ISLookUpService isLookupService) throws DnetTransformationException { try { @@ -45,7 +47,7 @@ public class TransformationFactory { transformationRuleId, isLookupService); final long dateOfTransformation = Long.parseLong(jobArgument.get("dateOfTransformation")); - return new XSLTTransformationFunction(counters, transformationRule, dateOfTransformation, + return new XSLTTransformationFunction(counters, report, transformationRule, dateOfTransformation, vocabularies); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java index 54192a7be..ad288b347 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java @@ -1,16 +1,22 @@ package eu.dnetlib.dhp.transformation.xslt; +import static eu.dnetlib.dhp.common.Constants.*; + +import java.io.IOException; import java.io.Serializable; import java.io.StringWriter; import java.nio.charset.StandardCharsets; import javax.xml.transform.stream.StreamSource; +import org.apache.avro.test.specialtypes.value; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.spark.api.java.function.MapFunction; import eu.dnetlib.dhp.aggregation.common.AggregationCounter; +import eu.dnetlib.dhp.common.aggregation.AggregatorReport; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.mdstore.MetadataRecord; import net.sf.saxon.s9api.*; @@ -25,9 +31,9 @@ public class XSLTTransformationFunction implements MapFunction