[Measures] last changes to accomodate tests

This commit is contained in:
Miriam Baglioni 2022-04-20 15:13:09 +02:00
parent 869407c6e2
commit 5feae77937
4 changed files with 181 additions and 49 deletions

View File

@ -94,19 +94,19 @@ public class SparkAtomicActionUsageJob implements Serializable {
public static void prepareResults(String db, SparkSession spark, String workingPath) { public static void prepareResults(String db, SparkSession spark, String workingPath) {
spark spark
.sql( .sql(
"Select result_id, downloads, views " + "Select result_id, downloads, views " +
"from " + db + ".usage_stats") "from " + db + ".usage_stats")
.as(Encoders.bean(UsageStatsModel.class)) .as(Encoders.bean(UsageStatsModel.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(workingPath); .json(workingPath);
} }
public static void prepareActionSet(SparkSession spark, String inputPath, String outputPath){ public static void prepareActionSet(SparkSession spark, String inputPath, String outputPath) {
readPath(spark, inputPath, UsageStatsModel.class) readPath(spark, inputPath, UsageStatsModel.class)
.groupByKey((MapFunction<UsageStatsModel, String>) us -> us.getResult_id(), Encoders.STRING()) .groupByKey((MapFunction<UsageStatsModel, String>) us -> us.getResult_id(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, UsageStatsModel, Result>) (k, it) -> { .mapGroups((MapGroupsFunction<String, UsageStatsModel, Result>) (k, it) -> {
UsageStatsModel first = it.next(); UsageStatsModel first = it.next();
it.forEachRemaining(us -> { it.forEachRemaining(us -> {
@ -117,7 +117,6 @@ public class SparkAtomicActionUsageJob implements Serializable {
Result res = new Result(); Result res = new Result();
res.setId("50|" + k); res.setId("50|" + k);
res.setMeasures(getMeasure(first.getDownloads(), first.getViews())); res.setMeasures(getMeasure(first.getDownloads(), first.getViews()));
return res; return res;
}, Encoders.bean(Result.class)) }, Encoders.bean(Result.class))

View File

@ -22,5 +22,11 @@
"paramLongName": "statsdb", "paramLongName": "statsdb",
"paramDescription": "the name of the db to be used", "paramDescription": "the name of the db to be used",
"paramRequired": true "paramRequired": true
},
{
"paramName": "wp",
"paramLongName": "workingPath",
"paramDescription": "the workingPath where to save the content of the usage_stats table",
"paramRequired": true
} }
] ]

View File

@ -89,6 +89,7 @@
<arg>--hive_metastore_uris</arg><arg>${hiveMetastoreUris}</arg> <arg>--hive_metastore_uris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--statsdb</arg><arg>${statsdb}</arg> <arg>--statsdb</arg><arg>${statsdb}</arg>
<arg>--workingPath</arg><arg>${workingDir}/usageDb</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -75,7 +75,7 @@ public class SparkAtomicActionCountJobTest {
.getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb") .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb")
.getPath(); .getPath();
SparkAtomicActionUsageJob.prepareActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); SparkAtomicActionUsageJob.prepareActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet");
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
@ -86,48 +86,174 @@ public class SparkAtomicActionCountJobTest {
Assertions.assertEquals(9, tmp.count()); Assertions.assertEquals(9, tmp.count());
tmp.foreach(r -> Assertions.assertEquals(2, r.getMeasures().size())); tmp.foreach(r -> Assertions.assertEquals(2, r.getMeasures().size()));
tmp.foreach(r -> r.getMeasures().stream().forEach(m -> tmp
m.getUnit().stream().forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference())))); .foreach(
tmp.foreach(r -> r.getMeasures().stream().forEach(m -> r -> r
m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred())))); .getMeasures()
tmp.foreach(r -> r.getMeasures().stream().forEach(m -> .stream()
m.getUnit().stream().forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible())))); .forEach(
m -> m
.getUnit()
.stream()
.forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference()))));
tmp
.foreach(
r -> r
.getMeasures()
.stream()
.forEach(
m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred()))));
tmp
.foreach(
r -> r
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible()))));
tmp.foreach(r -> r.getMeasures().stream().forEach(m -> tmp
m.getUnit().stream().forEach(u -> Assertions.assertEquals("measure:usage_counts", .foreach(
u.getDataInfo().getProvenanceaction().getClassid())))); r -> r
tmp.foreach(r -> r.getMeasures().stream().forEach(m -> .getMeasures()
m.getUnit().stream().forEach(u -> Assertions.assertEquals("Inferred by OpenAIRE", .stream()
u.getDataInfo().getProvenanceaction().getClassname())))); .forEach(
m -> m
.getUnit()
.stream()
.forEach(
u -> Assertions
.assertEquals(
"measure:usage_counts",
u.getDataInfo().getProvenanceaction().getClassid()))));
tmp
.foreach(
r -> r
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(
u -> Assertions
.assertEquals(
"Inferred by OpenAIRE",
u.getDataInfo().getProvenanceaction().getClassname()))));
tmp.foreach(r -> r.getMeasures().stream().forEach(m -> tmp
m.getUnit().stream().forEach(u -> Assertions.assertEquals("count", .foreach(
u.getKey())))); r -> r
.getMeasures()
.stream()
.forEach(
m -> m
.getUnit()
.stream()
.forEach(
u -> Assertions
.assertEquals(
"count",
u.getKey()))));
Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count()); Assertions
.assertEquals(
1, tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count());
Assertions.assertEquals("0", tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).collect().get(0) Assertions
.getMeasures().stream().filter(m -> m.getId().equals("downloads")).collect(Collectors.toList()).get(0) .assertEquals(
.getUnit().get(0).getValue()); "0",
Assertions.assertEquals("5", tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).collect().get(0) tmp
.getMeasures().stream().filter(m -> m.getId().equals("views")).collect(Collectors.toList()).get(0) .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
.getUnit().get(0).getValue()); .collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"5",
tmp
.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("views"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions.assertEquals("0", tmp.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")).collect().get(0) Assertions
.getMeasures().stream().filter(m -> m.getId().equals("downloads")).collect(Collectors.toList()).get(0) .assertEquals(
.getUnit().get(0).getValue()); "0",
Assertions.assertEquals("1", tmp.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")).collect().get(0) tmp
.getMeasures().stream().filter(m -> m.getId().equals("views")).collect(Collectors.toList()).get(0) .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0"))
.getUnit().get(0).getValue()); .collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"1",
tmp
.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("views"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions.assertEquals("2", tmp.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")).collect().get(0) Assertions
.getMeasures().stream().filter(m -> m.getId().equals("downloads")).collect(Collectors.toList()).get(0) .assertEquals(
.getUnit().get(0).getValue()); "2",
Assertions.assertEquals("6", tmp.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")).collect().get(0) tmp
.getMeasures().stream().filter(m -> m.getId().equals("views")).collect(Collectors.toList()).get(0) .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f"))
.getUnit().get(0).getValue()); .collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"6",
tmp
.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("views"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
} }
} }