diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index 04f861df5..012261c39 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -94,19 +94,19 @@ public class SparkAtomicActionUsageJob implements Serializable { public static void prepareResults(String db, SparkSession spark, String workingPath) { spark - .sql( - "Select result_id, downloads, views " + - "from " + db + ".usage_stats") - .as(Encoders.bean(UsageStatsModel.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingPath); + .sql( + "Select result_id, downloads, views " + + "from " + db + ".usage_stats") + .as(Encoders.bean(UsageStatsModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); } - public static void prepareActionSet(SparkSession spark, String inputPath, String outputPath){ - readPath(spark, inputPath, UsageStatsModel.class) - .groupByKey((MapFunction) us -> us.getResult_id(), Encoders.STRING()) + public static void prepareActionSet(SparkSession spark, String inputPath, String outputPath) { + readPath(spark, inputPath, UsageStatsModel.class) + .groupByKey((MapFunction) us -> us.getResult_id(), Encoders.STRING()) .mapGroups((MapGroupsFunction) (k, it) -> { UsageStatsModel first = it.next(); it.forEachRemaining(us -> { @@ -117,7 +117,6 @@ public class SparkAtomicActionUsageJob implements Serializable { Result res = new Result(); res.setId("50|" + k); - res.setMeasures(getMeasure(first.getDownloads(), first.getViews())); return res; }, Encoders.bean(Result.class)) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json index b8ae24e12..821905da4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json @@ -22,5 +22,11 @@ "paramLongName": "statsdb", "paramDescription": "the name of the db to be used", "paramRequired": true + }, + { + "paramName": "wp", + "paramLongName": "workingPath", + "paramDescription": "the workingPath where to save the content of the usage_stats table", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml index 781efb475..5b552ca4b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml @@ -89,6 +89,7 @@ --hive_metastore_uris${hiveMetastoreUris} --outputPath${outputPath} --statsdb${statsdb} + --workingPath${workingDir}/usageDb diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java index 33d76893d..7cc9eb326 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java @@ -75,7 +75,7 @@ public class SparkAtomicActionCountJobTest { .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb") .getPath(); - SparkAtomicActionUsageJob.prepareActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); + SparkAtomicActionUsageJob.prepareActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); @@ -86,48 +86,174 @@ public class SparkAtomicActionCountJobTest { Assertions.assertEquals(9, tmp.count()); tmp.foreach(r -> Assertions.assertEquals(2, r.getMeasures().size())); - tmp.foreach(r -> r.getMeasures().stream().forEach(m -> - m.getUnit().stream().forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference())))); - tmp.foreach(r -> r.getMeasures().stream().forEach(m -> - m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred())))); - tmp.foreach(r -> r.getMeasures().stream().forEach(m -> - m.getUnit().stream().forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible())))); + tmp + .foreach( + r -> r + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference())))); + tmp + .foreach( + r -> r + .getMeasures() + .stream() + .forEach( + m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred())))); + tmp + .foreach( + r -> r + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible())))); - tmp.foreach(r -> r.getMeasures().stream().forEach(m -> - m.getUnit().stream().forEach(u -> Assertions.assertEquals("measure:usage_counts", - u.getDataInfo().getProvenanceaction().getClassid())))); - tmp.foreach(r -> r.getMeasures().stream().forEach(m -> - m.getUnit().stream().forEach(u -> Assertions.assertEquals("Inferred by OpenAIRE", - u.getDataInfo().getProvenanceaction().getClassname())))); + tmp + .foreach( + r -> r + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "measure:usage_counts", + u.getDataInfo().getProvenanceaction().getClassid())))); + tmp + .foreach( + r -> r + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "Inferred by OpenAIRE", + u.getDataInfo().getProvenanceaction().getClassname())))); - tmp.foreach(r -> r.getMeasures().stream().forEach(m -> - m.getUnit().stream().forEach(u -> Assertions.assertEquals("count", - u.getKey())))); + tmp + .foreach( + r -> r + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "count", + u.getKey())))); - Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count()); + Assertions + .assertEquals( + 1, tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count()); - Assertions.assertEquals("0", tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).collect().get(0) - .getMeasures().stream().filter(m -> m.getId().equals("downloads")).collect(Collectors.toList()).get(0) - .getUnit().get(0).getValue()); - Assertions.assertEquals("5", tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).collect().get(0) - .getMeasures().stream().filter(m -> m.getId().equals("views")).collect(Collectors.toList()).get(0) - .getUnit().get(0).getValue()); + Assertions + .assertEquals( + "0", + tmp + .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "5", + tmp + .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); - Assertions.assertEquals("0", tmp.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")).collect().get(0) - .getMeasures().stream().filter(m -> m.getId().equals("downloads")).collect(Collectors.toList()).get(0) - .getUnit().get(0).getValue()); - Assertions.assertEquals("1", tmp.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")).collect().get(0) - .getMeasures().stream().filter(m -> m.getId().equals("views")).collect(Collectors.toList()).get(0) - .getUnit().get(0).getValue()); + Assertions + .assertEquals( + "0", + tmp + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "1", + tmp + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); - Assertions.assertEquals("2", tmp.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")).collect().get(0) - .getMeasures().stream().filter(m -> m.getId().equals("downloads")).collect(Collectors.toList()).get(0) - .getUnit().get(0).getValue()); - Assertions.assertEquals("6", tmp.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")).collect().get(0) - .getMeasures().stream().filter(m -> m.getId().equals("views")).collect(Collectors.toList()).get(0) - .getUnit().get(0).getValue()); + Assertions + .assertEquals( + "2", + tmp + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "6", + tmp + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); } - - }