[country propagation] fixes error 'cannot resolve countrySet given input columns: []' when there is no prepared information driving the propagation process for a given result type
This commit is contained in:
parent
699736addc
commit
3800361033
|
@ -14,6 +14,7 @@ import org.apache.spark.SparkConf;
|
|||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -84,11 +85,12 @@ public class SparkCountryPropagationJob {
|
|||
Dataset<R> res = readPath(spark, sourcePath, resultClazz);
|
||||
|
||||
log.info("Reading prepared info: {}", preparedInfoPath);
|
||||
Dataset<ResultCountrySet> prepared = spark
|
||||
final Dataset<Row> preparedInfoRaw = spark
|
||||
.read()
|
||||
.json(preparedInfoPath)
|
||||
.as(Encoders.bean(ResultCountrySet.class));
|
||||
.json(preparedInfoPath);
|
||||
|
||||
if (!preparedInfoRaw.isEmpty()) {
|
||||
final Dataset<ResultCountrySet> prepared = preparedInfoRaw.as(Encoders.bean(ResultCountrySet.class));
|
||||
res
|
||||
.joinWith(prepared, res.col("id").equalTo(prepared.col("resultId")), "left_outer")
|
||||
.map(getCountryMergeFn(), Encoders.bean(resultClazz))
|
||||
|
@ -96,7 +98,13 @@ public class SparkCountryPropagationJob {
|
|||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(outputPath);
|
||||
|
||||
} else {
|
||||
res
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(outputPath);
|
||||
}
|
||||
}
|
||||
|
||||
private static <R extends Result> MapFunction<Tuple2<R, ResultCountrySet>, R> getCountryMergeFn() {
|
||||
|
|
Loading…
Reference in New Issue