diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index 9b444c6fab..981da07899 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.actionmanager.Constants.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -14,6 +15,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -73,13 +75,28 @@ public class SparkAtomicActionUsageJob implements Serializable { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareData(dbname, spark, workingPath + "/usageDb", "usage_stats", "result_id"); + prepareResultData(dbname, spark, workingPath + "/usageDb", "usage_stats", "result_id", "repository_id"); prepareData(dbname, spark, workingPath + "/projectDb", "project_stats", "id"); - prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats", "repositor_id"); + prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats", "repository_id"); writeActionSet(spark, workingPath, outputPath); }); } + private static void prepareResultData(String dbname, SparkSession spark, String workingPath, String tableName, String resultAttributeName, String datasourceAttributeName) { + spark + .sql( + String + .format( + "select %s as id, %s as datasourceId, sum(downloads) as downloads, sum(views) as views " + + "from %s.%s group by %s, %s", + resultAttributeName, datasourceAttributeName, dbname, tableName, resultAttributeName, datasourceAttributeName)) + .as(Encoders.bean(UsageStatsResultModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); + } + private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName, String attribute_name) { spark @@ -114,16 +131,55 @@ public class SparkAtomicActionUsageJob implements Serializable { .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); } - + public static Measure newMeasureInstance(String id) { + Measure m = new Measure(); + m.setId(id); + m.setUnit(new ArrayList<>()); + return m; + } private static Dataset getFinalIndicatorsResult(SparkSession spark, String inputPath) { - return readPath(spark, inputPath, UsageStatsModel.class) - .map((MapFunction) usm -> { - Result r = new Result(); - r.setId("50|" + usm.getId()); - r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); - return r; - }, Encoders.bean(Result.class)); + return readPath(spark, inputPath, UsageStatsResultModel.class) + .groupByKey((MapFunction) usm -> usm.getId(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k,it) -> { + Result r = new Result(); + r.setId("50|" + k); + //id = download or view and unit = list of key value pairs + Measure download = newMeasureInstance("downloads"); + Measure view = newMeasureInstance("views"); + UsageStatsResultModel first = it.next(); + addCountForDatasource(download, first, view); + it.forEachRemaining(usm -> { + addCountForDatasource(download, usm, view); + }); + r.setMeasures(Arrays.asList(download, view)); + return r; + }, Encoders.bean(Result.class)) +// .map((MapFunction) usm -> { +// Result r = new Result(); +// r.setId("50|" + usm.getId()); +// r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); +// return r; +// }, Encoders.bean(Result.class)); + ; + } + + private static void addCountForDatasource(Measure download, UsageStatsResultModel usm, Measure view) { + DataInfo dataInfo = OafMapperUtils + .dataInfo( + false, + UPDATE_DATA_INFO_TYPE, + true, + false, + OafMapperUtils + .qualifier( + UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID, + UPDATE_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + ""); + download.getUnit().add(OafMapperUtils.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getDownloads()), dataInfo)); + view.getUnit().add(OafMapperUtils.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getViews()), dataInfo)); } private static Dataset getFinalIndicatorsProject(SparkSession spark, String inputPath) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsResultModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsResultModel.java new file mode 100644 index 0000000000..71d046c6fd --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsResultModel.java @@ -0,0 +1,17 @@ +package eu.dnetlib.dhp.actionmanager.usagestats; + +/** + * @author miriam.baglioni + * @Date 30/06/23 + */ +public class UsageStatsResultModel extends UsageStatsModel{ + private String datasourceId ; + + public String getDatasourceId() { + return datasourceId; + } + + public void setDatasourceId(String datasourceId) { + this.datasourceId = datasourceId; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java index 5982c88201..82728ff9d5 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java @@ -8,6 +8,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.stream.Collectors; +import eu.dnetlib.dhp.schema.oaf.Measure; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -67,9 +68,9 @@ public class SparkAtomicActionCountJobTest { } @Test - void testMatch() { + void testUsageStatsDb2() { String usageScoresPath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/usagestats") + .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/test2") .getPath(); SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); @@ -81,7 +82,7 @@ public class SparkAtomicActionCountJobTest { .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)); // .map(aa -> (Result) aa.getPayload()); - Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count()); + Assertions.assertEquals(7, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count()); Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")).count()); Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")).count()); @@ -143,7 +144,7 @@ public class SparkAtomicActionCountJobTest { "Inferred by OpenAIRE", u.getDataInfo().getProvenanceaction().getClassname())))); - tmp + tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")) .foreach( r -> ((OafEntity) r.getPayload()) .getMeasures() @@ -159,113 +160,60 @@ public class SparkAtomicActionCountJobTest { u.getKey())))); Assertions - .assertEquals( - 1, - tmp - .filter( + .assertEquals( + 1, + tmp + .filter( + r -> ((OafEntity) r.getPayload()) + .getId() + .equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .count()); + + OafEntity entity = (OafEntity) tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).first() + .getPayload(); + + entity + .getMeasures() + .stream() + .forEach( + m -> Assertions.assertEquals(3, m.getUnit().size() )); + + Measure downloads = entity.getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .findFirst() + .get(); + + + Assertions.assertEquals(String.valueOf(0), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue()); + Assertions.assertEquals(String.valueOf(0), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue()); + Assertions.assertEquals(String.valueOf(1), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue()); + + Measure views = entity.getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .findFirst() + .get(); + + Assertions.assertEquals(String.valueOf(5), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue()); + Assertions.assertEquals(String.valueOf(1), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue()); + Assertions.assertEquals(String.valueOf(3), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue()); + + tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")) + .foreach( r -> ((OafEntity) r.getPayload()) - .getId() - .equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) - .count()); + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "count", + u.getKey())))); - Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - Assertions - .assertEquals( - "5", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - - Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - Assertions - .assertEquals( - "1", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - - Assertions - .assertEquals( - "2", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - Assertions - .assertEquals( - "6", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); Assertions .assertEquals( @@ -465,5 +413,433 @@ public class SparkAtomicActionCountJobTest { .get(0) .getValue()); } + @Test + void testMatch() { + String usageScoresPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/test1") + .getPath(); + SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)); + // .map(aa -> (Result) aa.getPayload()); + + Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count()); + Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")).count()); + Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")).count()); + + tmp.foreach(r -> Assertions.assertEquals(2, ((OafEntity) r.getPayload()).getMeasures().size())); + tmp + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference())))); + tmp + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred())))); + tmp + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible())))); + + tmp + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "measure:usage_counts", + u.getDataInfo().getProvenanceaction().getClassid())))); + tmp + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "Inferred by OpenAIRE", + u.getDataInfo().getProvenanceaction().getClassname())))); + + tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")) + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "count", + u.getKey())))); + + tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")) + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "10|fake1", + u.getKey())))); + + tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")) + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "count", + u.getKey())))); + + Assertions + .assertEquals( + 1, + tmp + .filter( + r -> ((OafEntity) r.getPayload()) + .getId() + .equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .count()); + + Assertions + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "5", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "1", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "2", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "6", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "5", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "1", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "2", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "6", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "5", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "1", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "2", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "6", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + } } diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/datasourceDb similarity index 100% rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/datasourceDb diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/projectDb similarity index 100% rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/projectDb diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/usageDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/usageDb new file mode 100644 index 0000000000..a636407d0d --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/usageDb @@ -0,0 +1,9 @@ +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake1","downloads":0,"views":5} +{"id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","datasourceId":"10|fake1","downloads":0,"views":1} +{"id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","datasourceId":"10|fake1","downloads":1,"views":3} +{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","datasourceId":"10|fake1","downloads":1,"views":1} +{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","datasourceId":"10|fake1","downloads":2,"views":6} +{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","datasourceId":"10|fake1","downloads":0,"views":1} +{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","datasourceId":"10|fake1","downloads":1,"views":3} +{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","datasourceId":"10|fake1","downloads":1,"views":10} +{"id":"doi_________::4938a71a884dd481d329657aa543b850","datasourceId":"10|fake1","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/datasourceDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/datasourceDb new file mode 100644 index 0000000000..efbb4cfbd0 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/datasourceDb @@ -0,0 +1,9 @@ +{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5} +{"id":"d11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"id":"d11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"id":"d11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6} +{"id":"d12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"id":"d12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10} +{"id":"d13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/projectDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/projectDb new file mode 100644 index 0000000000..0b8cd1d709 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/projectDb @@ -0,0 +1,9 @@ +{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5} +{"id":"f11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"id":"f11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"id":"f11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6} +{"id":"f12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"id":"f12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10} +{"id":"f13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/usageDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/usageDb new file mode 100644 index 0000000000..0712d110ed --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/usageDb @@ -0,0 +1,9 @@ +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake1","downloads":0,"views":5} +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake2","downloads":0,"views":1} +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake3","downloads":1,"views":3} +{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","datasourceId":"10|fake1","downloads":1,"views":1} +{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","datasourceId":"10|fake1","downloads":2,"views":6} +{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","datasourceId":"10|fake1","downloads":0,"views":1} +{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","datasourceId":"10|fake1","downloads":1,"views":3} +{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","datasourceId":"10|fake1","downloads":1,"views":10} +{"id":"doi_________::4938a71a884dd481d329657aa543b850","datasourceId":"10|fake1","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb deleted file mode 100644 index 495ae0fc53..0000000000 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb +++ /dev/null @@ -1,9 +0,0 @@ -{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5} -{"id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} -{"id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} -{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} -{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6} -{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} -{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} -{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10} -{"id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file