From 4c9bc4c3a58c1983ab2f246e1cbead8103761f45 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 30 Jun 2023 19:05:15 +0200 Subject: [PATCH] refactoring --- .../usagestats/SparkAtomicActionUsageJob.java | 92 +- .../usagestats/UsageStatsResultModel.java | 17 +- .../SparkAtomicActionCountJobTest.java | 893 +++++++++--------- 3 files changed, 520 insertions(+), 482 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index 981da0789..01a30ac4a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -82,19 +82,21 @@ public class SparkAtomicActionUsageJob implements Serializable { }); } - private static void prepareResultData(String dbname, SparkSession spark, String workingPath, String tableName, String resultAttributeName, String datasourceAttributeName) { + private static void prepareResultData(String dbname, SparkSession spark, String workingPath, String tableName, + String resultAttributeName, String datasourceAttributeName) { spark - .sql( - String - .format( - "select %s as id, %s as datasourceId, sum(downloads) as downloads, sum(views) as views " + - "from %s.%s group by %s, %s", - resultAttributeName, datasourceAttributeName, dbname, tableName, resultAttributeName, datasourceAttributeName)) - .as(Encoders.bean(UsageStatsResultModel.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingPath); + .sql( + String + .format( + "select %s as id, %s as datasourceId, sum(downloads) as downloads, sum(views) as views " + + "from %s.%s group by %s, %s", + resultAttributeName, datasourceAttributeName, dbname, tableName, resultAttributeName, + datasourceAttributeName)) + .as(Encoders.bean(UsageStatsResultModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); } private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName, @@ -131,30 +133,32 @@ public class SparkAtomicActionUsageJob implements Serializable { .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); } + public static Measure newMeasureInstance(String id) { Measure m = new Measure(); m.setId(id); m.setUnit(new ArrayList<>()); return m; } + private static Dataset getFinalIndicatorsResult(SparkSession spark, String inputPath) { return readPath(spark, inputPath, UsageStatsResultModel.class) - .groupByKey((MapFunction) usm -> usm.getId(), Encoders.STRING()) - .mapGroups((MapGroupsFunction) (k,it) -> { - Result r = new Result(); - r.setId("50|" + k); - //id = download or view and unit = list of key value pairs - Measure download = newMeasureInstance("downloads"); - Measure view = newMeasureInstance("views"); - UsageStatsResultModel first = it.next(); - addCountForDatasource(download, first, view); - it.forEachRemaining(usm -> { - addCountForDatasource(download, usm, view); - }); - r.setMeasures(Arrays.asList(download, view)); - return r; - }, Encoders.bean(Result.class)) + .groupByKey((MapFunction) usm -> usm.getId(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, it) -> { + Result r = new Result(); + r.setId("50|" + k); + // id = download or view and unit = list of key value pairs + Measure download = newMeasureInstance("downloads"); + Measure view = newMeasureInstance("views"); + UsageStatsResultModel first = it.next(); + addCountForDatasource(download, first, view); + it.forEachRemaining(usm -> { + addCountForDatasource(download, usm, view); + }); + r.setMeasures(Arrays.asList(download, view)); + return r; + }, Encoders.bean(Result.class)) // .map((MapFunction) usm -> { // Result r = new Result(); // r.setId("50|" + usm.getId()); @@ -166,20 +170,26 @@ public class SparkAtomicActionUsageJob implements Serializable { private static void addCountForDatasource(Measure download, UsageStatsResultModel usm, Measure view) { DataInfo dataInfo = OafMapperUtils - .dataInfo( - false, - UPDATE_DATA_INFO_TYPE, - true, - false, - OafMapperUtils - .qualifier( - UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID, - UPDATE_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS, - ModelConstants.DNET_PROVENANCE_ACTIONS), - ""); - download.getUnit().add(OafMapperUtils.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getDownloads()), dataInfo)); - view.getUnit().add(OafMapperUtils.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getViews()), dataInfo)); + .dataInfo( + false, + UPDATE_DATA_INFO_TYPE, + true, + false, + OafMapperUtils + .qualifier( + UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID, + UPDATE_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + ""); + download + .getUnit() + .add( + OafMapperUtils + .newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getDownloads()), dataInfo)); + view + .getUnit() + .add(OafMapperUtils.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getViews()), dataInfo)); } private static Dataset getFinalIndicatorsProject(SparkSession spark, String inputPath) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsResultModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsResultModel.java index 71d046c6f..994e03bee 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsResultModel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsResultModel.java @@ -1,17 +1,18 @@ + package eu.dnetlib.dhp.actionmanager.usagestats; /** * @author miriam.baglioni * @Date 30/06/23 */ -public class UsageStatsResultModel extends UsageStatsModel{ - private String datasourceId ; +public class UsageStatsResultModel extends UsageStatsModel { + private String datasourceId; - public String getDatasourceId() { - return datasourceId; - } + public String getDatasourceId() { + return datasourceId; + } - public void setDatasourceId(String datasourceId) { - this.datasourceId = datasourceId; - } + public void setDatasourceId(String datasourceId) { + this.datasourceId = datasourceId; + } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java index 82728ff9d..c5a1ec126 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java @@ -8,7 +8,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.oaf.Measure; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -25,6 +24,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.Measure; import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.Result; @@ -144,7 +144,8 @@ public class SparkAtomicActionCountJobTest { "Inferred by OpenAIRE", u.getDataInfo().getProvenanceaction().getClassname())))); - tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")) + tmp + .filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")) .foreach( r -> ((OafEntity) r.getPayload()) .getMeasures() @@ -160,60 +161,82 @@ public class SparkAtomicActionCountJobTest { u.getKey())))); Assertions - .assertEquals( - 1, - tmp - .filter( - r -> ((OafEntity) r.getPayload()) - .getId() - .equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) - .count()); - - OafEntity entity = (OafEntity) tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).first() - .getPayload(); - - entity - .getMeasures() - .stream() - .forEach( - m -> Assertions.assertEquals(3, m.getUnit().size() )); - - Measure downloads = entity.getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .findFirst() - .get(); - - - Assertions.assertEquals(String.valueOf(0), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue()); - Assertions.assertEquals(String.valueOf(0), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue()); - Assertions.assertEquals(String.valueOf(1), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue()); - - Measure views = entity.getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .findFirst() - .get(); - - Assertions.assertEquals(String.valueOf(5), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue()); - Assertions.assertEquals(String.valueOf(1), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue()); - Assertions.assertEquals(String.valueOf(3), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue()); - - tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")) - .foreach( + .assertEquals( + 1, + tmp + .filter( r -> ((OafEntity) r.getPayload()) - .getMeasures() - .stream() - .forEach( - m -> m - .getUnit() - .stream() - .forEach( - u -> Assertions - .assertEquals( - "count", - u.getKey())))); + .getId() + .equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .count()); + OafEntity entity = (OafEntity) tmp + .filter( + aa -> ((OafEntity) aa.getPayload()).getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .first() + .getPayload(); + + entity + .getMeasures() + .stream() + .forEach( + m -> Assertions.assertEquals(3, m.getUnit().size())); + + Measure downloads = entity + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .findFirst() + .get(); + + Assertions + .assertEquals( + String.valueOf(0), + downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue()); + Assertions + .assertEquals( + String.valueOf(0), + downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue()); + Assertions + .assertEquals( + String.valueOf(1), + downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue()); + + Measure views = entity + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .findFirst() + .get(); + + Assertions + .assertEquals( + String.valueOf(5), + views.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue()); + Assertions + .assertEquals( + String.valueOf(1), + views.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue()); + Assertions + .assertEquals( + String.valueOf(3), + views.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue()); + + tmp + .filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")) + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "count", + u.getKey())))); Assertions .assertEquals( @@ -413,19 +436,20 @@ public class SparkAtomicActionCountJobTest { .get(0) .getValue()); } + @Test void testMatch() { String usageScoresPath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/test1") - .getPath(); + .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/test1") + .getPath(); SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) - .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)); + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)); // .map(aa -> (Result) aa.getPayload()); Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count()); @@ -434,412 +458,415 @@ public class SparkAtomicActionCountJobTest { tmp.foreach(r -> Assertions.assertEquals(2, ((OafEntity) r.getPayload()).getMeasures().size())); tmp - .foreach( - r -> ((OafEntity) r.getPayload()) - .getMeasures() - .stream() - .forEach( - m -> m - .getUnit() - .stream() - .forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference())))); + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference())))); tmp - .foreach( - r -> ((OafEntity) r.getPayload()) - .getMeasures() - .stream() - .forEach( - m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred())))); + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred())))); tmp - .foreach( - r -> ((OafEntity) r.getPayload()) - .getMeasures() - .stream() - .forEach( - m -> m - .getUnit() - .stream() - .forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible())))); + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible())))); tmp - .foreach( - r -> ((OafEntity) r.getPayload()) - .getMeasures() - .stream() - .forEach( - m -> m - .getUnit() - .stream() - .forEach( - u -> Assertions - .assertEquals( - "measure:usage_counts", - u.getDataInfo().getProvenanceaction().getClassid())))); + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "measure:usage_counts", + u.getDataInfo().getProvenanceaction().getClassid())))); tmp - .foreach( + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "Inferred by OpenAIRE", + u.getDataInfo().getProvenanceaction().getClassname())))); + + tmp + .filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")) + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "count", + u.getKey())))); + + tmp + .filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")) + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "10|fake1", + u.getKey())))); + + tmp + .filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")) + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "count", + u.getKey())))); + + Assertions + .assertEquals( + 1, + tmp + .filter( r -> ((OafEntity) r.getPayload()) - .getMeasures() - .stream() - .forEach( - m -> m - .getUnit() - .stream() - .forEach( - u -> Assertions - .assertEquals( - "Inferred by OpenAIRE", - u.getDataInfo().getProvenanceaction().getClassname())))); - - tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")) - .foreach( - r -> ((OafEntity) r.getPayload()) - .getMeasures() - .stream() - .forEach( - m -> m - .getUnit() - .stream() - .forEach( - u -> Assertions - .assertEquals( - "count", - u.getKey())))); - - tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")) - .foreach( - r -> ((OafEntity) r.getPayload()) - .getMeasures() - .stream() - .forEach( - m -> m - .getUnit() - .stream() - .forEach( - u -> Assertions - .assertEquals( - "10|fake1", - u.getKey())))); - - tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")) - .foreach( - r -> ((OafEntity) r.getPayload()) - .getMeasures() - .stream() - .forEach( - m -> m - .getUnit() - .stream() - .forEach( - u -> Assertions - .assertEquals( - "count", - u.getKey())))); + .getId() + .equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .count()); Assertions - .assertEquals( - 1, - tmp - .filter( - r -> ((OafEntity) r.getPayload()) - .getId() - .equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) - .count()); + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "5", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "5", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "1", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "2", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "1", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "6", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "2", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "6", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "5", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "5", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "1", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "2", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "1", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "6", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "2", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "6", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "5", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "5", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "1", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "2", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "1", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - - Assertions - .assertEquals( - "2", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - Assertions - .assertEquals( - "6", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "6", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); } }