From f803530df6785c8366bbf32943cc5b17a579bac7 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 10 Feb 2023 15:50:56 +0100 Subject: [PATCH 1/2] [UsageCount] fixed query --- .../usagestats/SparkAtomicActionUsageJob.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index 3dbda37889..f99f842655 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -75,17 +75,17 @@ public class SparkAtomicActionUsageJob implements Serializable { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareData(dbname, spark, workingPath + "/usageDb", "usage_stats"); - prepareData(dbname, spark, workingPath + "/projectDb", "project_stats"); - prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats"); + prepareData(dbname, spark, workingPath + "/usageDb", "usage_stats", "result_id"); + prepareData(dbname, spark, workingPath + "/projectDb", "project_stats", "id"); + prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats", "repositor_id"); writeActionSet(spark, workingPath, outputPath); }); } - private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName) { + private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName, String attribute_name) { spark .sql( - "Select result_id, downloads, views " + + "Select " + attribute_name + " as id, downloads, views " + "from " + dbname + "." + tableName) .as(Encoders.bean(UsageStatsModel.class)) .write() From 5cf902a2b098185cc013fd6065366725351b5d02 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 10 Feb 2023 16:16:37 +0100 Subject: [PATCH 2/2] [UsageCount] changed query to make the sum be computed via sql instead of grouping --- .../usagestats/SparkAtomicActionUsageJob.java | 28 +++++-------------- .../dhp/actionmanager/usagestats/datasourceDb | 9 ++---- .../actionmanager/usagestats/datasourceDb_old | 12 ++++++++ .../dhp/actionmanager/usagestats/projectDb | 9 ++---- .../actionmanager/usagestats/projectDb_old | 12 ++++++++ .../dhp/actionmanager/usagestats/usageDb | 9 ++---- .../dhp/actionmanager/usagestats/usageDb_old | 12 ++++++++ 7 files changed, 52 insertions(+), 39 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb_old create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb_old create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb_old diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index f99f842655..d7ca007084 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -8,15 +8,13 @@ import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.Optional; - -import eu.dnetlib.dhp.schema.common.MainEntityType; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.MapGroupsFunction; + import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -85,8 +83,9 @@ public class SparkAtomicActionUsageJob implements Serializable { private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName, String attribute_name) { spark .sql( - "Select " + attribute_name + " as id, downloads, views " + - "from " + dbname + "." + tableName) + "Select " + attribute_name + " as id, sum(downloads) as downloads, sum(views) as views " + + "from " + dbname + "." + tableName + + "group by " + attribute_name) .as(Encoders.bean(UsageStatsModel.class)) .write() .mode(SaveMode.Overwrite) @@ -115,7 +114,7 @@ public class SparkAtomicActionUsageJob implements Serializable { private static Dataset getFinalIndicatorsResult(SparkSession spark, String inputPath) { - return getUsageStatsModelDataset(spark, inputPath) + return readPath(spark, inputPath, UsageStatsModel.class) .map((MapFunction) usm -> { Result r = new Result(); r.setId("50|" + usm.getId()); @@ -126,7 +125,7 @@ public class SparkAtomicActionUsageJob implements Serializable { private static Dataset getFinalIndicatorsProject(SparkSession spark, String inputPath) { - return getUsageStatsModelDataset(spark, inputPath) + return readPath(spark, inputPath, UsageStatsModel.class) .map((MapFunction) usm -> { Project r = new Project(); r.setId("40|" + usm.getId()); @@ -137,7 +136,7 @@ public class SparkAtomicActionUsageJob implements Serializable { private static Dataset getFinalIndicatorsDatasource(SparkSession spark, String inputPath) { - return getUsageStatsModelDataset(spark, inputPath) + return readPath(spark, inputPath, UsageStatsModel.class) .map((MapFunction) usm -> { Datasource r = new Datasource(); r.setId("10|" + usm.getId()); @@ -146,20 +145,7 @@ public class SparkAtomicActionUsageJob implements Serializable { }, Encoders.bean(Datasource.class)); } - private static Dataset getUsageStatsModelDataset(SparkSession spark, String inputPath) { - return readPath(spark, inputPath, UsageStatsModel.class) - .groupByKey((MapFunction) us -> us.getId(), Encoders.STRING()) - .mapGroups((MapGroupsFunction) (k, it) -> { - UsageStatsModel first = it.next(); - it.forEachRemaining(us -> { - first.setDownloads(first.getDownloads() + us.getDownloads()); - first.setViews(first.getViews() + us.getViews()); - }); - first.setId(k); - return first; - }, Encoders.bean(UsageStatsModel.class)); - } private static List getMeasure(Long downloads, Long views) { DataInfo dataInfo = OafMapperUtils diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb index 7337ba3e25..efbb4cfbd0 100644 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb @@ -1,12 +1,9 @@ -{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} -{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5} {"id":"d11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} {"id":"d11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} {"id":"d11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} -{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} -{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6} {"id":"d12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} {"id":"d12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} -{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} -{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10} {"id":"d13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb_old b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb_old new file mode 100644 index 0000000000..7337ba3e25 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb_old @@ -0,0 +1,12 @@ +{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} +{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"id":"d11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"id":"d11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"id":"d11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} +{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"id":"d12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"id":"d12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} +{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"id":"d13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb index 0ecab2a829..0b8cd1d709 100644 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb @@ -1,12 +1,9 @@ -{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} -{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5} {"id":"f11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} {"id":"f11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} {"id":"f11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} -{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} -{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6} {"id":"f12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} {"id":"f12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} -{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} -{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10} {"id":"f13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb_old b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb_old new file mode 100644 index 0000000000..0ecab2a829 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb_old @@ -0,0 +1,12 @@ +{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} +{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"id":"f11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"id":"f11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"id":"f11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} +{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"id":"f12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"id":"f12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} +{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"id":"f13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb index eb3290eda0..495ae0fc53 100644 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb @@ -1,12 +1,9 @@ -{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} -{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5} {"id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} {"id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} {"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} -{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} -{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6} {"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} {"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} -{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} -{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10} {"id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb_old b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb_old new file mode 100644 index 0000000000..eb3290eda0 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb_old @@ -0,0 +1,12 @@ +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} +{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} +{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file