diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index 3dbda3788..d7ca00708 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -8,15 +8,13 @@ import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.Optional; - -import eu.dnetlib.dhp.schema.common.MainEntityType; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.MapGroupsFunction; + import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -75,18 +73,19 @@ public class SparkAtomicActionUsageJob implements Serializable { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareData(dbname, spark, workingPath + "/usageDb", "usage_stats"); - prepareData(dbname, spark, workingPath + "/projectDb", "project_stats"); - prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats"); + prepareData(dbname, spark, workingPath + "/usageDb", "usage_stats", "result_id"); + prepareData(dbname, spark, workingPath + "/projectDb", "project_stats", "id"); + prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats", "repositor_id"); writeActionSet(spark, workingPath, outputPath); }); } - private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName) { + private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName, String attribute_name) { spark .sql( - "Select result_id, downloads, views " + - "from " + dbname + "." + tableName) + "Select " + attribute_name + " as id, sum(downloads) as downloads, sum(views) as views " + + "from " + dbname + "." + tableName + + "group by " + attribute_name) .as(Encoders.bean(UsageStatsModel.class)) .write() .mode(SaveMode.Overwrite) @@ -115,7 +114,7 @@ public class SparkAtomicActionUsageJob implements Serializable { private static Dataset getFinalIndicatorsResult(SparkSession spark, String inputPath) { - return getUsageStatsModelDataset(spark, inputPath) + return readPath(spark, inputPath, UsageStatsModel.class) .map((MapFunction) usm -> { Result r = new Result(); r.setId("50|" + usm.getId()); @@ -126,7 +125,7 @@ public class SparkAtomicActionUsageJob implements Serializable { private static Dataset getFinalIndicatorsProject(SparkSession spark, String inputPath) { - return getUsageStatsModelDataset(spark, inputPath) + return readPath(spark, inputPath, UsageStatsModel.class) .map((MapFunction) usm -> { Project r = new Project(); r.setId("40|" + usm.getId()); @@ -137,7 +136,7 @@ public class SparkAtomicActionUsageJob implements Serializable { private static Dataset getFinalIndicatorsDatasource(SparkSession spark, String inputPath) { - return getUsageStatsModelDataset(spark, inputPath) + return readPath(spark, inputPath, UsageStatsModel.class) .map((MapFunction) usm -> { Datasource r = new Datasource(); r.setId("10|" + usm.getId()); @@ -146,20 +145,7 @@ public class SparkAtomicActionUsageJob implements Serializable { }, Encoders.bean(Datasource.class)); } - private static Dataset getUsageStatsModelDataset(SparkSession spark, String inputPath) { - return readPath(spark, inputPath, UsageStatsModel.class) - .groupByKey((MapFunction) us -> us.getId(), Encoders.STRING()) - .mapGroups((MapGroupsFunction) (k, it) -> { - UsageStatsModel first = it.next(); - it.forEachRemaining(us -> { - first.setDownloads(first.getDownloads() + us.getDownloads()); - first.setViews(first.getViews() + us.getViews()); - }); - first.setId(k); - return first; - }, Encoders.bean(UsageStatsModel.class)); - } private static List getMeasure(Long downloads, Long views) { DataInfo dataInfo = OafMapperUtils diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb index 7337ba3e2..efbb4cfbd 100644 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb @@ -1,12 +1,9 @@ -{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} -{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5} {"id":"d11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} {"id":"d11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} {"id":"d11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} -{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} -{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6} {"id":"d12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} {"id":"d12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} -{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} -{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10} {"id":"d13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb_old b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb_old new file mode 100644 index 000000000..7337ba3e2 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb_old @@ -0,0 +1,12 @@ +{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} +{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"id":"d11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"id":"d11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"id":"d11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} +{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"id":"d12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"id":"d12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} +{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"id":"d13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb index 0ecab2a82..0b8cd1d70 100644 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb @@ -1,12 +1,9 @@ -{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} -{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5} {"id":"f11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} {"id":"f11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} {"id":"f11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} -{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} -{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6} {"id":"f12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} {"id":"f12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} -{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} -{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10} {"id":"f13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb_old b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb_old new file mode 100644 index 000000000..0ecab2a82 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb_old @@ -0,0 +1,12 @@ +{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} +{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"id":"f11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"id":"f11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"id":"f11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} +{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"id":"f12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"id":"f12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} +{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"id":"f13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb index eb3290eda..495ae0fc5 100644 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb @@ -1,12 +1,9 @@ -{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} -{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5} {"id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} {"id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} {"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} -{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} -{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6} {"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} {"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} -{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} -{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10} {"id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb_old b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb_old new file mode 100644 index 000000000..eb3290eda --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb_old @@ -0,0 +1,12 @@ +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} +{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} +{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file