From 55ea485783ad1980bf40388dc48e8991db1d02f7 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 30 Jun 2023 18:39:30 +0200 Subject: [PATCH] [UsageCount] split the count for result at the level of the datasource. for each indicator one unit is specified for each datasource contrinuting to that indicator value. The datasource key is the value of the key element in the unit for the measure, while the count for that datasource is in the value --- .../usagestats/SparkAtomicActionUsageJob.java | 76 ++- .../usagestats/UsageStatsResultModel.java | 17 + .../SparkAtomicActionCountJobTest.java | 594 ++++++++++++++---- .../usagestats/{ => test1}/datasourceDb | 0 .../usagestats/{ => test1}/projectDb | 0 .../actionmanager/usagestats/test1/usageDb | 9 + .../usagestats/test2/datasourceDb | 9 + .../actionmanager/usagestats/test2/projectDb | 9 + .../actionmanager/usagestats/test2/usageDb | 9 + .../dhp/actionmanager/usagestats/usageDb | 9 - 10 files changed, 604 insertions(+), 128 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsResultModel.java rename dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/{ => test1}/datasourceDb (100%) rename dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/{ => test1}/projectDb (100%) create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/usageDb create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/datasourceDb create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/projectDb create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/usageDb delete mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index 9b444c6fa..981da0789 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.actionmanager.Constants.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -14,6 +15,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -73,13 +75,28 @@ public class SparkAtomicActionUsageJob implements Serializable { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareData(dbname, spark, workingPath + "/usageDb", "usage_stats", "result_id"); + prepareResultData(dbname, spark, workingPath + "/usageDb", "usage_stats", "result_id", "repository_id"); prepareData(dbname, spark, workingPath + "/projectDb", "project_stats", "id"); - prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats", "repositor_id"); + prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats", "repository_id"); writeActionSet(spark, workingPath, outputPath); }); } + private static void prepareResultData(String dbname, SparkSession spark, String workingPath, String tableName, String resultAttributeName, String datasourceAttributeName) { + spark + .sql( + String + .format( + "select %s as id, %s as datasourceId, sum(downloads) as downloads, sum(views) as views " + + "from %s.%s group by %s, %s", + resultAttributeName, datasourceAttributeName, dbname, tableName, resultAttributeName, datasourceAttributeName)) + .as(Encoders.bean(UsageStatsResultModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); + } + private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName, String attribute_name) { spark @@ -114,16 +131,55 @@ public class SparkAtomicActionUsageJob implements Serializable { .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); } - + public static Measure newMeasureInstance(String id) { + Measure m = new Measure(); + m.setId(id); + m.setUnit(new ArrayList<>()); + return m; + } private static Dataset getFinalIndicatorsResult(SparkSession spark, String inputPath) { - return readPath(spark, inputPath, UsageStatsModel.class) - .map((MapFunction) usm -> { - Result r = new Result(); - r.setId("50|" + usm.getId()); - r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); - return r; - }, Encoders.bean(Result.class)); + return readPath(spark, inputPath, UsageStatsResultModel.class) + .groupByKey((MapFunction) usm -> usm.getId(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k,it) -> { + Result r = new Result(); + r.setId("50|" + k); + //id = download or view and unit = list of key value pairs + Measure download = newMeasureInstance("downloads"); + Measure view = newMeasureInstance("views"); + UsageStatsResultModel first = it.next(); + addCountForDatasource(download, first, view); + it.forEachRemaining(usm -> { + addCountForDatasource(download, usm, view); + }); + r.setMeasures(Arrays.asList(download, view)); + return r; + }, Encoders.bean(Result.class)) +// .map((MapFunction) usm -> { +// Result r = new Result(); +// r.setId("50|" + usm.getId()); +// r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); +// return r; +// }, Encoders.bean(Result.class)); + ; + } + + private static void addCountForDatasource(Measure download, UsageStatsResultModel usm, Measure view) { + DataInfo dataInfo = OafMapperUtils + .dataInfo( + false, + UPDATE_DATA_INFO_TYPE, + true, + false, + OafMapperUtils + .qualifier( + UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID, + UPDATE_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + ""); + download.getUnit().add(OafMapperUtils.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getDownloads()), dataInfo)); + view.getUnit().add(OafMapperUtils.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getViews()), dataInfo)); } private static Dataset getFinalIndicatorsProject(SparkSession spark, String inputPath) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsResultModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsResultModel.java new file mode 100644 index 000000000..71d046c6f --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsResultModel.java @@ -0,0 +1,17 @@ +package eu.dnetlib.dhp.actionmanager.usagestats; + +/** + * @author miriam.baglioni + * @Date 30/06/23 + */ +public class UsageStatsResultModel extends UsageStatsModel{ + private String datasourceId ; + + public String getDatasourceId() { + return datasourceId; + } + + public void setDatasourceId(String datasourceId) { + this.datasourceId = datasourceId; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java index 5982c8820..82728ff9d 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java @@ -8,6 +8,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.stream.Collectors; +import eu.dnetlib.dhp.schema.oaf.Measure; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -67,9 +68,9 @@ public class SparkAtomicActionCountJobTest { } @Test - void testMatch() { + void testUsageStatsDb2() { String usageScoresPath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/usagestats") + .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/test2") .getPath(); SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); @@ -81,7 +82,7 @@ public class SparkAtomicActionCountJobTest { .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)); // .map(aa -> (Result) aa.getPayload()); - Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count()); + Assertions.assertEquals(7, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count()); Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")).count()); Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")).count()); @@ -143,7 +144,7 @@ public class SparkAtomicActionCountJobTest { "Inferred by OpenAIRE", u.getDataInfo().getProvenanceaction().getClassname())))); - tmp + tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")) .foreach( r -> ((OafEntity) r.getPayload()) .getMeasures() @@ -159,113 +160,60 @@ public class SparkAtomicActionCountJobTest { u.getKey())))); Assertions - .assertEquals( - 1, - tmp - .filter( + .assertEquals( + 1, + tmp + .filter( + r -> ((OafEntity) r.getPayload()) + .getId() + .equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .count()); + + OafEntity entity = (OafEntity) tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).first() + .getPayload(); + + entity + .getMeasures() + .stream() + .forEach( + m -> Assertions.assertEquals(3, m.getUnit().size() )); + + Measure downloads = entity.getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .findFirst() + .get(); + + + Assertions.assertEquals(String.valueOf(0), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue()); + Assertions.assertEquals(String.valueOf(0), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue()); + Assertions.assertEquals(String.valueOf(1), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue()); + + Measure views = entity.getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .findFirst() + .get(); + + Assertions.assertEquals(String.valueOf(5), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue()); + Assertions.assertEquals(String.valueOf(1), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue()); + Assertions.assertEquals(String.valueOf(3), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue()); + + tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")) + .foreach( r -> ((OafEntity) r.getPayload()) - .getId() - .equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) - .count()); + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "count", + u.getKey())))); - Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - Assertions - .assertEquals( - "5", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - - Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - Assertions - .assertEquals( - "1", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - - Assertions - .assertEquals( - "2", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - Assertions - .assertEquals( - "6", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); Assertions .assertEquals( @@ -465,5 +413,433 @@ public class SparkAtomicActionCountJobTest { .get(0) .getValue()); } + @Test + void testMatch() { + String usageScoresPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/test1") + .getPath(); + SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)); + // .map(aa -> (Result) aa.getPayload()); + + Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count()); + Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")).count()); + Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")).count()); + + tmp.foreach(r -> Assertions.assertEquals(2, ((OafEntity) r.getPayload()).getMeasures().size())); + tmp + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference())))); + tmp + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred())))); + tmp + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible())))); + + tmp + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "measure:usage_counts", + u.getDataInfo().getProvenanceaction().getClassid())))); + tmp + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "Inferred by OpenAIRE", + u.getDataInfo().getProvenanceaction().getClassname())))); + + tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")) + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "count", + u.getKey())))); + + tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")) + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "10|fake1", + u.getKey())))); + + tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")) + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "count", + u.getKey())))); + + Assertions + .assertEquals( + 1, + tmp + .filter( + r -> ((OafEntity) r.getPayload()) + .getId() + .equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .count()); + + Assertions + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "5", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "1", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "2", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "6", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "5", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "1", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "2", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "6", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "5", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "1", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "2", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "6", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + } } diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/datasourceDb similarity index 100% rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/datasourceDb diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/projectDb similarity index 100% rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/projectDb diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/usageDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/usageDb new file mode 100644 index 000000000..a636407d0 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/usageDb @@ -0,0 +1,9 @@ +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake1","downloads":0,"views":5} +{"id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","datasourceId":"10|fake1","downloads":0,"views":1} +{"id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","datasourceId":"10|fake1","downloads":1,"views":3} +{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","datasourceId":"10|fake1","downloads":1,"views":1} +{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","datasourceId":"10|fake1","downloads":2,"views":6} +{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","datasourceId":"10|fake1","downloads":0,"views":1} +{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","datasourceId":"10|fake1","downloads":1,"views":3} +{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","datasourceId":"10|fake1","downloads":1,"views":10} +{"id":"doi_________::4938a71a884dd481d329657aa543b850","datasourceId":"10|fake1","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/datasourceDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/datasourceDb new file mode 100644 index 000000000..efbb4cfbd --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/datasourceDb @@ -0,0 +1,9 @@ +{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5} +{"id":"d11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"id":"d11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"id":"d11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6} +{"id":"d12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"id":"d12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10} +{"id":"d13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/projectDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/projectDb new file mode 100644 index 000000000..0b8cd1d70 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/projectDb @@ -0,0 +1,9 @@ +{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5} +{"id":"f11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"id":"f11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"id":"f11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6} +{"id":"f12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"id":"f12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10} +{"id":"f13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/usageDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/usageDb new file mode 100644 index 000000000..0712d110e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/usageDb @@ -0,0 +1,9 @@ +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake1","downloads":0,"views":5} +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake2","downloads":0,"views":1} +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake3","downloads":1,"views":3} +{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","datasourceId":"10|fake1","downloads":1,"views":1} +{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","datasourceId":"10|fake1","downloads":2,"views":6} +{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","datasourceId":"10|fake1","downloads":0,"views":1} +{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","datasourceId":"10|fake1","downloads":1,"views":3} +{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","datasourceId":"10|fake1","downloads":1,"views":10} +{"id":"doi_________::4938a71a884dd481d329657aa543b850","datasourceId":"10|fake1","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb deleted file mode 100644 index 495ae0fc5..000000000 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb +++ /dev/null @@ -1,9 +0,0 @@ -{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5} -{"id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} -{"id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} -{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} -{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6} -{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} -{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} -{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10} -{"id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file