Master branch updates from beta September 2023 #337
|
@ -75,17 +75,17 @@ public class SparkAtomicActionUsageJob implements Serializable {
|
|||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
removeOutputDir(spark, outputPath);
|
||||
prepareData(dbname, spark, workingPath + "/usageDb", "usage_stats");
|
||||
prepareData(dbname, spark, workingPath + "/projectDb", "project_stats");
|
||||
prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats");
|
||||
prepareData(dbname, spark, workingPath + "/usageDb", "usage_stats", "result_id");
|
||||
prepareData(dbname, spark, workingPath + "/projectDb", "project_stats", "id");
|
||||
prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats", "repositor_id");
|
||||
writeActionSet(spark, workingPath, outputPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName) {
|
||||
private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName, String attribute_name) {
|
||||
spark
|
||||
.sql(
|
||||
"Select result_id, downloads, views " +
|
||||
"Select " + attribute_name + " as id, downloads, views " +
|
||||
"from " + dbname + "." + tableName)
|
||||
.as(Encoders.bean(UsageStatsModel.class))
|
||||
.write()
|
||||
|
|
Loading…
Reference in New Issue