diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob3.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob3.java index 3d526ec695..8f12fcf3df 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob3.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob3.java @@ -71,20 +71,15 @@ public class SparkCountryPropagationJob3 { Class resultClazz = (Class) Class.forName(resultClassName); SparkConf conf = new SparkConf(); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - runWithSparkSession( conf, isSparkSessionManaged, - spark -> { - - execPropagation( - spark, - inputPath, - outputPath, - resultClazz, - saveGraph); - }); + spark -> execPropagation( + spark, + inputPath, + outputPath, + resultClazz, + saveGraph)); } private static void execPropagation( @@ -101,8 +96,8 @@ public class SparkCountryPropagationJob3 { spark .read() .json(inputPath) - .as(Encoders.kryo(resultClazz)) - .groupByKey((MapFunction) result1 -> result1.getId(), Encoders.STRING()) + .as(Encoders.bean(resultClazz)) + .groupByKey((MapFunction) r -> r.getId(), Encoders.STRING()) .mapGroups(getCountryMergeFn(resultClazz), Encoders.bean(resultClazz)) .write() .option("compression", "gzip")