diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java index 55fdbac59..10a3d4465 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java @@ -47,8 +47,8 @@ public class CleanContextSparkJob implements Serializable { String inputPath = parser.get("inputPath"); log.info("inputPath: {}", inputPath); - String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); String contextId = parser.get("contextId"); log.info("contextId: {}", contextId); @@ -67,12 +67,12 @@ public class CleanContextSparkJob implements Serializable { isSparkSessionManaged, spark -> { - cleanContext(spark, contextId, verifyParam, inputPath, entityClazz, workingPath); + cleanContext(spark, contextId, verifyParam, inputPath, entityClazz, workingDir); }); } private static void cleanContext(SparkSession spark, String contextId, String verifyParam, - String inputPath, Class entityClazz, String workingPath) { + String inputPath, Class entityClazz, String workingDir) { Dataset res = spark .read() .textFile(inputPath) @@ -106,11 +106,11 @@ public class CleanContextSparkJob implements Serializable { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(workingPath); + .json(workingDir); spark .read() - .textFile(workingPath) + .textFile(workingDir) .map( (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz))