[cleaning] align parameter names

This commit is contained in:
Claudio Atzori 2022-12-20 21:43:59 +01:00
parent 5816ded93f
commit 33bdad104e
1 changed files with 6 additions and 6 deletions

View File

@ -47,8 +47,8 @@ public class CleanContextSparkJob implements Serializable {
String inputPath = parser.get("inputPath"); String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath); log.info("inputPath: {}", inputPath);
String workingPath = parser.get("workingPath"); String workingDir = parser.get("workingDir");
log.info("workingPath: {}", workingPath); log.info("workingDir: {}", workingDir);
String contextId = parser.get("contextId"); String contextId = parser.get("contextId");
log.info("contextId: {}", contextId); log.info("contextId: {}", contextId);
@ -67,12 +67,12 @@ public class CleanContextSparkJob implements Serializable {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
cleanContext(spark, contextId, verifyParam, inputPath, entityClazz, workingPath); cleanContext(spark, contextId, verifyParam, inputPath, entityClazz, workingDir);
}); });
} }
private static <T extends Result> void cleanContext(SparkSession spark, String contextId, String verifyParam, private static <T extends Result> void cleanContext(SparkSession spark, String contextId, String verifyParam,
String inputPath, Class<T> entityClazz, String workingPath) { String inputPath, Class<T> entityClazz, String workingDir) {
Dataset<T> res = spark Dataset<T> res = spark
.read() .read()
.textFile(inputPath) .textFile(inputPath)
@ -106,11 +106,11 @@ public class CleanContextSparkJob implements Serializable {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(workingPath); .json(workingDir);
spark spark
.read() .read()
.textFile(workingPath) .textFile(workingDir)
.map( .map(
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz), (MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, entityClazz),
Encoders.bean(entityClazz)) Encoders.bean(entityClazz))