[UsageCount] code extention to include also the name of the datasource

This commit is contained in:
Miriam Baglioni 2024-01-29 18:12:33 +01:00
parent e9131f4e4a
commit a418dacb47
3 changed files with 33 additions and 4 deletions

View File

@ -14,6 +14,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
@ -70,12 +71,19 @@ public class SparkAtomicActionUsageJob implements Serializable {
final String workingPath = parser.get("workingPath");
final String datasourcePath = parser.get("datasourcePath");
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
prepareResultData(dbname, spark, workingPath + "/usageDb", "usage_stats", "result_id", "repository_id");
prepareResultData(
dbname, spark, workingPath + "/usageDb",
"usage_stats",
"result_id",
"repository_id",
datasourcePath);
prepareData(dbname, spark, workingPath + "/projectDb", "project_stats", "id");
prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats", "repository_id");
writeActionSet(spark, workingPath, outputPath);
@ -83,8 +91,9 @@ public class SparkAtomicActionUsageJob implements Serializable {
}
private static void prepareResultData(String dbname, SparkSession spark, String workingPath, String tableName,
String resultAttributeName, String datasourceAttributeName) {
spark
String resultAttributeName, String datasourceAttributeName,
String datasourcePath) {
Dataset<UsageStatsResultModel> resultModel = spark
.sql(
String
.format(
@ -92,7 +101,20 @@ public class SparkAtomicActionUsageJob implements Serializable {
"from %s.%s group by %s, %s",
resultAttributeName, datasourceAttributeName, dbname, tableName, resultAttributeName,
datasourceAttributeName))
.as(Encoders.bean(UsageStatsResultModel.class))
.as(Encoders.bean(UsageStatsResultModel.class));
Dataset<Datasource> datasource = readPath(spark, datasourcePath, Datasource.class)
.filter((FilterFunction<Datasource>) d -> !d.getDataInfo().getDeletedbyinference())
.map((MapFunction<Datasource, Datasource>) d -> {
d.setId(d.getId().substring(3));
return d;
}, Encoders.bean(Datasource.class));
resultModel
.joinWith(datasource, resultModel.col("datasourceId").equalTo(datasource.col("id")), "left")
.map((MapFunction<Tuple2<UsageStatsResultModel, Datasource>, UsageStatsResultModel>) t2 -> {
UsageStatsResultModel usrm = t2._1();
usrm.setDatasourceId(usrm.getDatasourceId() + "||" + t2._2().getOfficialname().getValue());
return usrm;
}, Encoders.bean(UsageStatsResultModel.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")

View File

@ -28,5 +28,11 @@
"paramLongName": "workingPath",
"paramDescription": "the workingPath where to save the content of the usage_stats table",
"paramRequired": true
},
{
"paramName": "dp",
"paramLongName": "datasourcePath",
"paramDescription": "the workingPath where to save the content of the usage_stats table",
"paramRequired": true
}
]

View File

@ -90,6 +90,7 @@
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--usagestatsdb</arg><arg>${usagestatsdb}</arg>
<arg>--workingPath</arg><arg>${workingDir}</arg>
<arg>--datasourcePath</arg><arg>${datasourcePath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>