From a418dacb47fba75af4abd94ada4bd53c869eea94 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 29 Jan 2024 18:12:33 +0100 Subject: [PATCH] [UsageCount] code extention to include also the name of the datasource --- .../usagestats/SparkAtomicActionUsageJob.java | 30 ++++++++++++++++--- .../usagestats/input_actionset_parameter.json | 6 ++++ .../usagestats/oozie_app/workflow.xml | 1 + 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index 01a30ac4a..d6b52ad9b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -14,6 +14,7 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; @@ -70,12 +71,19 @@ public class SparkAtomicActionUsageJob implements Serializable { final String workingPath = parser.get("workingPath"); + final String datasourcePath = parser.get("datasourcePath"); + runWithSparkHiveSession( conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareResultData(dbname, spark, workingPath + "/usageDb", "usage_stats", "result_id", "repository_id"); + prepareResultData( + dbname, spark, workingPath + "/usageDb", + "usage_stats", + "result_id", + "repository_id", + datasourcePath); prepareData(dbname, spark, workingPath + "/projectDb", "project_stats", "id"); prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats", "repository_id"); writeActionSet(spark, workingPath, outputPath); @@ -83,8 +91,9 @@ public class SparkAtomicActionUsageJob implements Serializable { } private static void prepareResultData(String dbname, SparkSession spark, String workingPath, String tableName, - String resultAttributeName, String datasourceAttributeName) { - spark + String resultAttributeName, String datasourceAttributeName, + String datasourcePath) { + Dataset resultModel = spark .sql( String .format( @@ -92,7 +101,20 @@ public class SparkAtomicActionUsageJob implements Serializable { "from %s.%s group by %s, %s", resultAttributeName, datasourceAttributeName, dbname, tableName, resultAttributeName, datasourceAttributeName)) - .as(Encoders.bean(UsageStatsResultModel.class)) + .as(Encoders.bean(UsageStatsResultModel.class)); + Dataset datasource = readPath(spark, datasourcePath, Datasource.class) + .filter((FilterFunction) d -> !d.getDataInfo().getDeletedbyinference()) + .map((MapFunction) d -> { + d.setId(d.getId().substring(3)); + return d; + }, Encoders.bean(Datasource.class)); + resultModel + .joinWith(datasource, resultModel.col("datasourceId").equalTo(datasource.col("id")), "left") + .map((MapFunction, UsageStatsResultModel>) t2 -> { + UsageStatsResultModel usrm = t2._1(); + usrm.setDatasourceId(usrm.getDatasourceId() + "||" + t2._2().getOfficialname().getValue()); + return usrm; + }, Encoders.bean(UsageStatsResultModel.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json index e9200d3ad..a42817f05 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json @@ -28,5 +28,11 @@ "paramLongName": "workingPath", "paramDescription": "the workingPath where to save the content of the usage_stats table", "paramRequired": true + }, + { + "paramName": "dp", + "paramLongName": "datasourcePath", + "paramDescription": "the workingPath where to save the content of the usage_stats table", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml index de188718a..88f259519 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml @@ -90,6 +90,7 @@ --outputPath${outputPath} --usagestatsdb${usagestatsdb} --workingPath${workingDir} + --datasourcePath${datasourcePath}