From ca4391aa1c5c03ecb0477fa287b77da97e3f9c8b Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 2 Feb 2021 12:44:04 +0100 Subject: [PATCH] minor changes --- .../transformation/TransformSparkJobNode.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java index 0a01faf1e..51f69de10 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java @@ -2,8 +2,7 @@ package eu.dnetlib.dhp.transformation; import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*; -import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.saveDataset; -import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.writeTotalSizeOnHDFS; +import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.IOException; @@ -19,8 +18,6 @@ import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; import eu.dnetlib.dhp.aggregation.common.AggregationCounter; import eu.dnetlib.dhp.application.ArgumentApplicationParser; @@ -52,11 +49,14 @@ public class TransformSparkJobNode { final String mdstoreInputVersion = parser.get("mdstoreInputVersion"); final String mdstoreOutputVersion = parser.get("mdstoreOutputVersion"); - // TODO this variable will be used after implementing Messaging with DNet Aggregator - final ObjectMapper jsonMapper = new ObjectMapper(); - final MDStoreVersion nativeMdStoreVersion = jsonMapper.readValue(mdstoreInputVersion, MDStoreVersion.class); - final MDStoreVersion cleanedMdStoreVersion = jsonMapper.readValue(mdstoreOutputVersion, MDStoreVersion.class); + final MDStoreVersion nativeMdStoreVersion = MAPPER.readValue(mdstoreInputVersion, MDStoreVersion.class); + final String inputPath = nativeMdStoreVersion.getHdfsPath() + MDSTORE_DATA_PATH; + log.info("input path: {}", inputPath); + + final MDStoreVersion cleanedMdStoreVersion = MAPPER.readValue(mdstoreOutputVersion, MDStoreVersion.class); + final String outputPath = cleanedMdStoreVersion.getHdfsPath() + MDSTORE_DATA_PATH; + log.info("output path: {}", outputPath); final String isLookupUrl = parser.get("isLookupUrl"); log.info(String.format("isLookupUrl: %s", isLookupUrl)); @@ -74,9 +74,10 @@ public class TransformSparkJobNode { runWithSparkSession( conf, isSparkSessionManaged, - spark -> transformRecords( - parser.getObjectMap(), isLookupService, spark, nativeMdStoreVersion.getHdfsPath() + MDSTORE_DATA_PATH, - cleanedMdStoreVersion.getHdfsPath() + MDSTORE_DATA_PATH)); + spark -> { + transformRecords( + parser.getObjectMap(), isLookupService, spark, inputPath, outputPath); + }); } public static void transformRecords(final Map args, final ISLookUpService isLookUpService,