changes in the construction of new country set

This commit is contained in:
Miriam Baglioni 2020-05-07 10:01:34 +02:00
parent 55e825acd4
commit 29bc8c44b1
1 changed files with 26 additions and 10 deletions

View File

@ -54,14 +54,17 @@ public class SparkCountryPropagationJob2 {
final String datasourcecountrypath = parser.get("preparedInfoPath"); final String datasourcecountrypath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {}", datasourcecountrypath); log.info("preparedInfoPath: {}", datasourcecountrypath);
final String possibleUpdatesPath = datasourcecountrypath
.substring(0, datasourcecountrypath.lastIndexOf("/") + 1)
+ "possibleUpdates";
log.info("possibleUpdatesPath: {}", possibleUpdatesPath);
final String resultClassName = parser.get("resultTableName"); final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName); log.info("resultTableName: {}", resultClassName);
final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase();
log.info("resultType: {}", resultType);
final String possibleUpdatesPath = datasourcecountrypath
.substring(0, datasourcecountrypath.lastIndexOf("/") + 1)
+ "possibleUpdates/" + resultType;
log.info("possibleUpdatesPath: {}", possibleUpdatesPath);
final Boolean saveGraph = Optional final Boolean saveGraph = Optional
.ofNullable(parser.get("saveGraph")) .ofNullable(parser.get("saveGraph"))
.map(Boolean::valueOf) .map(Boolean::valueOf)
@ -219,12 +222,12 @@ public class SparkCountryPropagationJob2 {
// Encoders.bean(resultClazz)); // Encoders.bean(resultClazz));
log.info("Saving graph table to path: {}", outputPath); log.info("Saving graph table to path: {}", outputPath);
// log.info("number of saved recordsa: {}", new_table.count()); log.info("number of saved recordsa: {}", new_table.count());
new_table.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(outputPath); new_table.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(outputPath);
} }
private static <R extends Result> Dataset<Row> getPotentialResultToUpdate( private static <R extends Result> Dataset<ResultCountrySet> getPotentialResultToUpdate(
SparkSession spark, SparkSession spark,
String inputPath, String inputPath,
Class<R> resultClazz, Class<R> resultClazz,
@ -237,7 +240,7 @@ public class SparkCountryPropagationJob2 {
return countryPropagationAssoc(spark, datasourcecountryassoc); return countryPropagationAssoc(spark, datasourcecountryassoc);
} }
private static Dataset<Row> countryPropagationAssoc( private static Dataset<ResultCountrySet> countryPropagationAssoc(
SparkSession spark, SparkSession spark,
Dataset<DatasourceCountry> datasource_country) { Dataset<DatasourceCountry> datasource_country) {
@ -256,7 +259,19 @@ public class SparkCountryPropagationJob2 {
+ "JOIN cfhb " + "JOIN cfhb "
+ " ON hb = dataSourceId ) tmp " + " ON hb = dataSourceId ) tmp "
+ "GROUP BY id"; + "GROUP BY id";
Dataset<Row> potentialUpdates = spark.sql(query);
Dataset<ResultCountrySet> potentialUpdates = spark
.sql(query)
.as(Encoders.bean(ResultCountrySet.class))
.map((MapFunction<ResultCountrySet, ResultCountrySet>) r -> {
final ArrayList<CountrySbs> c = r
.getCountrySet()
.stream()
.limit(100)
.collect(Collectors.toCollection(ArrayList::new));
r.setCountrySet(c);
return r;
}, Encoders.bean(ResultCountrySet.class));
// log.info("potential update number : {}", potentialUpdates.count()); // log.info("potential update number : {}", potentialUpdates.count());
return potentialUpdates; return potentialUpdates;
} }
@ -267,7 +282,8 @@ public class SparkCountryPropagationJob2 {
.read() .read()
.textFile(relationPath) .textFile(relationPath)
.map( .map(
value -> OBJECT_MAPPER.readValue(value, DatasourceCountry.class), (MapFunction<String, DatasourceCountry>) value -> OBJECT_MAPPER
.readValue(value, DatasourceCountry.class),
Encoders.bean(DatasourceCountry.class)); Encoders.bean(DatasourceCountry.class));
} }
} }