diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java index 58bc741de..059e388db 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob2.java @@ -54,14 +54,17 @@ public class SparkCountryPropagationJob2 { final String datasourcecountrypath = parser.get("preparedInfoPath"); log.info("preparedInfoPath: {}", datasourcecountrypath); - final String possibleUpdatesPath = datasourcecountrypath - .substring(0, datasourcecountrypath.lastIndexOf("/") + 1) - + "possibleUpdates"; - log.info("possibleUpdatesPath: {}", possibleUpdatesPath); - final String resultClassName = parser.get("resultTableName"); log.info("resultTableName: {}", resultClassName); + final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); + log.info("resultType: {}", resultType); + + final String possibleUpdatesPath = datasourcecountrypath + .substring(0, datasourcecountrypath.lastIndexOf("/") + 1) + + "possibleUpdates/" + resultType; + log.info("possibleUpdatesPath: {}", possibleUpdatesPath); + final Boolean saveGraph = Optional .ofNullable(parser.get("saveGraph")) .map(Boolean::valueOf) @@ -219,12 +222,12 @@ public class SparkCountryPropagationJob2 { // Encoders.bean(resultClazz)); log.info("Saving graph table to path: {}", outputPath); - // log.info("number of saved recordsa: {}", new_table.count()); + log.info("number of saved recordsa: {}", new_table.count()); new_table.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(outputPath); } - private static Dataset getPotentialResultToUpdate( + private static Dataset getPotentialResultToUpdate( SparkSession spark, String inputPath, Class resultClazz, @@ -237,7 +240,7 @@ public class SparkCountryPropagationJob2 { return countryPropagationAssoc(spark, datasourcecountryassoc); } - private static Dataset countryPropagationAssoc( + private static Dataset countryPropagationAssoc( SparkSession spark, Dataset datasource_country) { @@ -256,7 +259,19 @@ public class SparkCountryPropagationJob2 { + "JOIN cfhb " + " ON hb = dataSourceId ) tmp " + "GROUP BY id"; - Dataset potentialUpdates = spark.sql(query); + + Dataset potentialUpdates = spark + .sql(query) + .as(Encoders.bean(ResultCountrySet.class)) + .map((MapFunction) r -> { + final ArrayList c = r + .getCountrySet() + .stream() + .limit(100) + .collect(Collectors.toCollection(ArrayList::new)); + r.setCountrySet(c); + return r; + }, Encoders.bean(ResultCountrySet.class)); // log.info("potential update number : {}", potentialUpdates.count()); return potentialUpdates; } @@ -267,7 +282,8 @@ public class SparkCountryPropagationJob2 { .read() .textFile(relationPath) .map( - value -> OBJECT_MAPPER.readValue(value, DatasourceCountry.class), + (MapFunction) value -> OBJECT_MAPPER + .readValue(value, DatasourceCountry.class), Encoders.bean(DatasourceCountry.class)); } }