From 5feae77937df688e43f8c17988424dcf04259722 Mon Sep 17 00:00:00 2001
From: "miriam.baglioni" <miriam.baglioni@isti.cnr.it>
Date: Wed, 20 Apr 2022 15:13:09 +0200
Subject: [PATCH] [Measures] last changes to accomodate tests

---
 .../usagestats/SparkAtomicActionUsageJob.java |  23 +-
 .../usagestats/input_actionset_parameter.json |   6 +
 .../usagestats/oozie_app/workflow.xml         |   1 +
 .../SparkAtomicActionCountJobTest.java        | 200 ++++++++++++++----
 4 files changed, 181 insertions(+), 49 deletions(-)

diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java
index 04f861df5..012261c39 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java
@@ -94,19 +94,19 @@ public class SparkAtomicActionUsageJob implements Serializable {
 
 	public static void prepareResults(String db, SparkSession spark, String workingPath) {
 		spark
-				.sql(
-						"Select result_id, downloads, views " +
-								"from " + db + ".usage_stats")
-				.as(Encoders.bean(UsageStatsModel.class))
-				.write()
-				.mode(SaveMode.Overwrite)
-				.option("compression", "gzip")
-				.json(workingPath);
+			.sql(
+				"Select result_id, downloads, views " +
+					"from " + db + ".usage_stats")
+			.as(Encoders.bean(UsageStatsModel.class))
+			.write()
+			.mode(SaveMode.Overwrite)
+			.option("compression", "gzip")
+			.json(workingPath);
 	}
 
-	public static void prepareActionSet(SparkSession spark, String inputPath, String outputPath){
-			readPath(spark, inputPath, UsageStatsModel.class)
-					.groupByKey((MapFunction<UsageStatsModel, String>) us -> us.getResult_id(), Encoders.STRING())
+	public static void prepareActionSet(SparkSession spark, String inputPath, String outputPath) {
+		readPath(spark, inputPath, UsageStatsModel.class)
+			.groupByKey((MapFunction<UsageStatsModel, String>) us -> us.getResult_id(), Encoders.STRING())
 			.mapGroups((MapGroupsFunction<String, UsageStatsModel, Result>) (k, it) -> {
 				UsageStatsModel first = it.next();
 				it.forEachRemaining(us -> {
@@ -117,7 +117,6 @@ public class SparkAtomicActionUsageJob implements Serializable {
 				Result res = new Result();
 				res.setId("50|" + k);
 
-
 				res.setMeasures(getMeasure(first.getDownloads(), first.getViews()));
 				return res;
 			}, Encoders.bean(Result.class))
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json
index b8ae24e12..821905da4 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json
@@ -22,5 +22,11 @@
     "paramLongName": "statsdb",
     "paramDescription": "the name of the db to be used",
     "paramRequired": true
+  },
+  {
+    "paramName": "wp",
+    "paramLongName": "workingPath",
+    "paramDescription": "the workingPath where to save the content of the usage_stats table",
+    "paramRequired": true
   }
 ]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml
index 781efb475..5b552ca4b 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml
@@ -89,6 +89,7 @@
             <arg>--hive_metastore_uris</arg><arg>${hiveMetastoreUris}</arg>
             <arg>--outputPath</arg><arg>${outputPath}</arg>
             <arg>--statsdb</arg><arg>${statsdb}</arg>
+            <arg>--workingPath</arg><arg>${workingDir}/usageDb</arg>
         </spark>
         <ok to="End"/>
         <error to="Kill"/>
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java
index 33d76893d..7cc9eb326 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java
@@ -75,7 +75,7 @@ public class SparkAtomicActionCountJobTest {
 			.getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb")
 			.getPath();
 
-		SparkAtomicActionUsageJob.prepareActionSet(spark, usageScoresPath,  workingDir.toString() + "/actionSet");
+		SparkAtomicActionUsageJob.prepareActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet");
 
 		final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
 
@@ -86,48 +86,174 @@ public class SparkAtomicActionCountJobTest {
 		Assertions.assertEquals(9, tmp.count());
 
 		tmp.foreach(r -> Assertions.assertEquals(2, r.getMeasures().size()));
-		tmp.foreach(r -> r.getMeasures().stream().forEach(m ->
-				m.getUnit().stream().forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference()))));
-		tmp.foreach(r -> r.getMeasures().stream().forEach(m ->
-				m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred()))));
-		tmp.foreach(r -> r.getMeasures().stream().forEach(m ->
-				m.getUnit().stream().forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible()))));
+		tmp
+			.foreach(
+				r -> r
+					.getMeasures()
+					.stream()
+					.forEach(
+						m -> m
+							.getUnit()
+							.stream()
+							.forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference()))));
+		tmp
+			.foreach(
+				r -> r
+					.getMeasures()
+					.stream()
+					.forEach(
+						m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred()))));
+		tmp
+			.foreach(
+				r -> r
+					.getMeasures()
+					.stream()
+					.forEach(
+						m -> m
+							.getUnit()
+							.stream()
+							.forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible()))));
 
-		tmp.foreach(r -> r.getMeasures().stream().forEach(m ->
-				m.getUnit().stream().forEach(u -> Assertions.assertEquals("measure:usage_counts",
-						u.getDataInfo().getProvenanceaction().getClassid()))));
-		tmp.foreach(r -> r.getMeasures().stream().forEach(m ->
-				m.getUnit().stream().forEach(u -> Assertions.assertEquals("Inferred by OpenAIRE",
-						u.getDataInfo().getProvenanceaction().getClassname()))));
+		tmp
+			.foreach(
+				r -> r
+					.getMeasures()
+					.stream()
+					.forEach(
+						m -> m
+							.getUnit()
+							.stream()
+							.forEach(
+								u -> Assertions
+									.assertEquals(
+										"measure:usage_counts",
+										u.getDataInfo().getProvenanceaction().getClassid()))));
+		tmp
+			.foreach(
+				r -> r
+					.getMeasures()
+					.stream()
+					.forEach(
+						m -> m
+							.getUnit()
+							.stream()
+							.forEach(
+								u -> Assertions
+									.assertEquals(
+										"Inferred by OpenAIRE",
+										u.getDataInfo().getProvenanceaction().getClassname()))));
 
-		tmp.foreach(r -> r.getMeasures().stream().forEach(m ->
-				m.getUnit().stream().forEach(u -> Assertions.assertEquals("count",
-						u.getKey()))));
+		tmp
+			.foreach(
+				r -> r
+					.getMeasures()
+					.stream()
+					.forEach(
+						m -> m
+							.getUnit()
+							.stream()
+							.forEach(
+								u -> Assertions
+									.assertEquals(
+										"count",
+										u.getKey()))));
 
-		Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count());
+		Assertions
+			.assertEquals(
+				1, tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count());
 
-		Assertions.assertEquals("0", tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).collect().get(0)
-				.getMeasures().stream().filter(m -> m.getId().equals("downloads")).collect(Collectors.toList()).get(0)
-				.getUnit().get(0).getValue());
-		Assertions.assertEquals("5", tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).collect().get(0)
-				.getMeasures().stream().filter(m -> m.getId().equals("views")).collect(Collectors.toList()).get(0)
-				.getUnit().get(0).getValue());
+		Assertions
+			.assertEquals(
+				"0",
+				tmp
+					.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
+					.collect()
+					.get(0)
+					.getMeasures()
+					.stream()
+					.filter(m -> m.getId().equals("downloads"))
+					.collect(Collectors.toList())
+					.get(0)
+					.getUnit()
+					.get(0)
+					.getValue());
+		Assertions
+			.assertEquals(
+				"5",
+				tmp
+					.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
+					.collect()
+					.get(0)
+					.getMeasures()
+					.stream()
+					.filter(m -> m.getId().equals("views"))
+					.collect(Collectors.toList())
+					.get(0)
+					.getUnit()
+					.get(0)
+					.getValue());
 
-		Assertions.assertEquals("0", tmp.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")).collect().get(0)
-				.getMeasures().stream().filter(m -> m.getId().equals("downloads")).collect(Collectors.toList()).get(0)
-				.getUnit().get(0).getValue());
-		Assertions.assertEquals("1", tmp.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")).collect().get(0)
-				.getMeasures().stream().filter(m -> m.getId().equals("views")).collect(Collectors.toList()).get(0)
-				.getUnit().get(0).getValue());
+		Assertions
+			.assertEquals(
+				"0",
+				tmp
+					.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0"))
+					.collect()
+					.get(0)
+					.getMeasures()
+					.stream()
+					.filter(m -> m.getId().equals("downloads"))
+					.collect(Collectors.toList())
+					.get(0)
+					.getUnit()
+					.get(0)
+					.getValue());
+		Assertions
+			.assertEquals(
+				"1",
+				tmp
+					.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0"))
+					.collect()
+					.get(0)
+					.getMeasures()
+					.stream()
+					.filter(m -> m.getId().equals("views"))
+					.collect(Collectors.toList())
+					.get(0)
+					.getUnit()
+					.get(0)
+					.getValue());
 
-		Assertions.assertEquals("2", tmp.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")).collect().get(0)
-				.getMeasures().stream().filter(m -> m.getId().equals("downloads")).collect(Collectors.toList()).get(0)
-				.getUnit().get(0).getValue());
-		Assertions.assertEquals("6", tmp.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")).collect().get(0)
-				.getMeasures().stream().filter(m -> m.getId().equals("views")).collect(Collectors.toList()).get(0)
-				.getUnit().get(0).getValue());
+		Assertions
+			.assertEquals(
+				"2",
+				tmp
+					.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f"))
+					.collect()
+					.get(0)
+					.getMeasures()
+					.stream()
+					.filter(m -> m.getId().equals("downloads"))
+					.collect(Collectors.toList())
+					.get(0)
+					.getUnit()
+					.get(0)
+					.getValue());
+		Assertions
+			.assertEquals(
+				"6",
+				tmp
+					.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f"))
+					.collect()
+					.get(0)
+					.getMeasures()
+					.stream()
+					.filter(m -> m.getId().equals("views"))
+					.collect(Collectors.toList())
+					.get(0)
+					.getUnit()
+					.get(0)
+					.getValue());
 	}
 
-
-
 }