From 85e53fad004386fe295751caebe43ad0b7777830 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 9 Feb 2023 18:59:45 +0100 Subject: [PATCH] [UsageCount] addition of usagecount for Projects and datasources. Extention of the action set created for the results with new entities for projects and datasources. Extention of the resource set and modification of the testing class --- .../usagestats/SparkAtomicActionUsageJob.java | 103 +++++++++++++----- .../usagestats/UsageStatsModel.java | 10 +- .../usagestats/oozie_app/workflow.xml | 2 +- .../SparkAtomicActionCountJobTest.java | 45 +++++--- .../dhp/actionmanager/usagestats/datasourceDb | 12 ++ .../dhp/actionmanager/usagestats/projectDb | 12 ++ .../dhp/actionmanager/usagestats/usageDb | 12 ++ .../dhp/actionmanager/usagestats/usagestatsdb | 12 -- pom.xml | 2 +- 9 files changed, 143 insertions(+), 67 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb delete mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index 5f099b8f2..3dbda3788 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -9,6 +9,8 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; +import eu.dnetlib.dhp.schema.common.MainEntityType; +import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -28,9 +30,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.Measure; -import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import scala.Tuple2; @@ -76,41 +75,37 @@ public class SparkAtomicActionUsageJob implements Serializable { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareResults(dbname, spark, workingPath); + prepareData(dbname, spark, workingPath + "/usageDb", "usage_stats"); + prepareData(dbname, spark, workingPath + "/projectDb", "project_stats"); + prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats"); writeActionSet(spark, workingPath, outputPath); }); } - public static void prepareResults(String db, SparkSession spark, String workingPath) { + private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName) { spark - .sql( - "Select result_id, downloads, views " + - "from " + db + ".usage_stats") - .as(Encoders.bean(UsageStatsModel.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingPath); + .sql( + "Select result_id, downloads, views " + + "from " + dbname + "." + tableName) + .as(Encoders.bean(UsageStatsModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); } + + public static void writeActionSet(SparkSession spark, String inputPath, String outputPath) { - readPath(spark, inputPath, UsageStatsModel.class) - .groupByKey((MapFunction) us -> us.getResult_id(), Encoders.STRING()) - .mapGroups((MapGroupsFunction) (k, it) -> { - UsageStatsModel first = it.next(); - it.forEachRemaining(us -> { - first.setDownloads(first.getDownloads() + us.getDownloads()); - first.setViews(first.getViews() + us.getViews()); - }); - - Result res = new Result(); - res.setId("50|" + k); - - res.setMeasures(getMeasure(first.getDownloads(), first.getViews())); - return res; - }, Encoders.bean(Result.class)) - .toJavaRDD() - .map(p -> new AtomicAction(p.getClass(), p)) + getFinalIndicatorsResult(spark, inputPath+ "/usageDb"). + toJavaRDD(). + map(p -> new AtomicAction(p.getClass(),p)) + .union(getFinalIndicatorsProject(spark, inputPath + "/projectDb") + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p ))) + .union(getFinalIndicatorsDatasource(spark, inputPath + "/datasourceDb") + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p))) .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), new Text(OBJECT_MAPPER.writeValueAsString(aa)))) @@ -118,6 +113,54 @@ public class SparkAtomicActionUsageJob implements Serializable { } + private static Dataset getFinalIndicatorsResult(SparkSession spark, String inputPath) { + + return getUsageStatsModelDataset(spark, inputPath) + .map((MapFunction) usm -> { + Result r = new Result(); + r.setId("50|" + usm.getId()); + r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); + return r; + }, Encoders.bean(Result.class)); + } + + private static Dataset getFinalIndicatorsProject(SparkSession spark, String inputPath) { + + return getUsageStatsModelDataset(spark, inputPath) + .map((MapFunction) usm -> { + Project r = new Project(); + r.setId("40|" + usm.getId()); + r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); + return r; + }, Encoders.bean(Project.class)); + } + + private static Dataset getFinalIndicatorsDatasource(SparkSession spark, String inputPath) { + + return getUsageStatsModelDataset(spark, inputPath) + .map((MapFunction) usm -> { + Datasource r = new Datasource(); + r.setId("10|" + usm.getId()); + r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); + return r; + }, Encoders.bean(Datasource.class)); + } + + private static Dataset getUsageStatsModelDataset(SparkSession spark, String inputPath) { + return readPath(spark, inputPath, UsageStatsModel.class) + .groupByKey((MapFunction) us -> us.getId(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, it) -> { + UsageStatsModel first = it.next(); + it.forEachRemaining(us -> { + first.setDownloads(first.getDownloads() + us.getDownloads()); + first.setViews(first.getViews() + us.getViews()); + }); + first.setId(k); + return first; + + }, Encoders.bean(UsageStatsModel.class)); + } + private static List getMeasure(Long downloads, Long views) { DataInfo dataInfo = OafMapperUtils .dataInfo( diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsModel.java index df8a77eb6..07f69b0bb 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsModel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsModel.java @@ -4,16 +4,16 @@ package eu.dnetlib.dhp.actionmanager.usagestats; import java.io.Serializable; public class UsageStatsModel implements Serializable { - private String result_id; + private String id; private Long downloads; private Long views; - public String getResult_id() { - return result_id; + public String getId() { + return id; } - public void setResult_id(String result_id) { - this.result_id = result_id; + public void setId(String id) { + this.id = id; } public Long getDownloads() { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml index d94cf7d53..de188718a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml @@ -89,7 +89,7 @@ --hive_metastore_uris${hiveMetastoreUris} --outputPath${outputPath} --usagestatsdb${usagestatsdb} - --workingPath${workingDir}/usageDb + --workingPath${workingDir} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java index 8aa718bae..8ff3c36f7 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java @@ -8,6 +8,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.stream.Collectors; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -68,24 +69,26 @@ public class SparkAtomicActionCountJobTest { @Test void testMatch() { String usageScoresPath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb") + .getResource("/eu/dnetlib/dhp/actionmanager/usagestats") .getPath(); SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - JavaRDD tmp = sc + JavaRDD tmp = sc .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) - .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)) - .map(aa -> (Result) aa.getPayload()); + .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)); + //.map(aa -> (Result) aa.getPayload()); - Assertions.assertEquals(9, tmp.count()); + Assertions.assertEquals(9,tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count()); + Assertions.assertEquals(9,tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")).count()); + Assertions.assertEquals(9,tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")).count()); - tmp.foreach(r -> Assertions.assertEquals(2, r.getMeasures().size())); + tmp.foreach(r -> Assertions.assertEquals(2, ((OafEntity)r.getPayload()).getMeasures().size())); tmp .foreach( - r -> r + r -> ((OafEntity)r.getPayload()) .getMeasures() .stream() .forEach( @@ -95,14 +98,14 @@ public class SparkAtomicActionCountJobTest { .forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference())))); tmp .foreach( - r -> r + r -> ((OafEntity)r.getPayload()) .getMeasures() .stream() .forEach( m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred())))); tmp .foreach( - r -> r + r -> ((OafEntity)r.getPayload()) .getMeasures() .stream() .forEach( @@ -113,7 +116,7 @@ public class SparkAtomicActionCountJobTest { tmp .foreach( - r -> r + r -> ((OafEntity)r.getPayload()) .getMeasures() .stream() .forEach( @@ -127,7 +130,7 @@ public class SparkAtomicActionCountJobTest { u.getDataInfo().getProvenanceaction().getClassid())))); tmp .foreach( - r -> r + r -> ((OafEntity)r.getPayload()) .getMeasures() .stream() .forEach( @@ -142,7 +145,7 @@ public class SparkAtomicActionCountJobTest { tmp .foreach( - r -> r + r -> ((OafEntity)r.getPayload()) .getMeasures() .stream() .forEach( @@ -157,12 +160,13 @@ public class SparkAtomicActionCountJobTest { Assertions .assertEquals( - 1, tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count()); + 1, tmp.filter(r -> ((OafEntity)r.getPayload()).getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count()); Assertions .assertEquals( "0", tmp + .map(r -> ((OafEntity)r.getPayload())) .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) .collect() .get(0) @@ -178,7 +182,8 @@ public class SparkAtomicActionCountJobTest { .assertEquals( "5", tmp - .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .map(r -> ((OafEntity)r.getPayload())) + .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) .collect() .get(0) .getMeasures() @@ -194,7 +199,8 @@ public class SparkAtomicActionCountJobTest { .assertEquals( "0", tmp - .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .map(r -> ((OafEntity)r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) .collect() .get(0) .getMeasures() @@ -209,7 +215,8 @@ public class SparkAtomicActionCountJobTest { .assertEquals( "1", tmp - .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .map(r -> ((OafEntity)r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) .collect() .get(0) .getMeasures() @@ -225,7 +232,8 @@ public class SparkAtomicActionCountJobTest { .assertEquals( "2", tmp - .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .map(r -> ((OafEntity)r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) .collect() .get(0) .getMeasures() @@ -240,7 +248,8 @@ public class SparkAtomicActionCountJobTest { .assertEquals( "6", tmp - .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .map(r -> ((OafEntity)r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) .collect() .get(0) .getMeasures() diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb new file mode 100644 index 000000000..7337ba3e2 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb @@ -0,0 +1,12 @@ +{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} +{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"id":"d11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"id":"d11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"id":"d11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} +{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"id":"d12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"id":"d12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} +{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"id":"d13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb new file mode 100644 index 000000000..0ecab2a82 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb @@ -0,0 +1,12 @@ +{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} +{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"id":"f11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"id":"f11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"id":"f11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} +{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"id":"f12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"id":"f12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} +{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"id":"f13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb new file mode 100644 index 000000000..eb3290eda --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb @@ -0,0 +1,12 @@ +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} +{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} +{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb deleted file mode 100644 index fee74f697..000000000 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb +++ /dev/null @@ -1,12 +0,0 @@ -{"result_id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} -{"result_id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} -{"result_id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} -{"result_id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} -{"result_id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} -{"result_id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} -{"result_id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} -{"result_id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} -{"result_id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} -{"result_id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} -{"result_id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} -{"result_id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 9b60b9078..839188a62 100644 --- a/pom.xml +++ b/pom.xml @@ -807,7 +807,7 @@ 3.3.3 3.4.2 [2.12,3.0) - [3.15.0] + [3.15.1-SNAPSHOT] [4.0.3] [6.0.5] [3.1.6]