1
0
Fork 0

code formatting

This commit is contained in:
Claudio Atzori 2023-02-22 10:15:25 +01:00
parent 3b876d9327
commit 0c1be41b30
2 changed files with 253 additions and 251 deletions

View File

@ -8,13 +8,12 @@ import java.io.Serializable;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
@ -28,6 +27,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2; import scala.Tuple2;
@ -80,31 +80,31 @@ public class SparkAtomicActionUsageJob implements Serializable {
}); });
} }
private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName, String attribute_name) { private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName,
String attribute_name) {
spark spark
.sql( .sql(String.format(
"Select " + attribute_name + " as id, sum(downloads) as downloads, sum(views) as views " + "select %s as id, sum(downloads) as downloads, sum(views) as views " +
"from " + dbname + "." + tableName + "from %s.%s group by %s", attribute_name, dbname, tableName, attribute_name))
"group by " + attribute_name) .as(Encoders.bean(UsageStatsModel.class))
.as(Encoders.bean(UsageStatsModel.class)) .write()
.write() .mode(SaveMode.Overwrite)
.mode(SaveMode.Overwrite) .option("compression", "gzip")
.option("compression", "gzip") .json(workingPath);
.json(workingPath);
} }
public static void writeActionSet(SparkSession spark, String inputPath, String outputPath) { public static void writeActionSet(SparkSession spark, String inputPath, String outputPath) {
getFinalIndicatorsResult(spark, inputPath+ "/usageDb"). getFinalIndicatorsResult(spark, inputPath + "/usageDb")
toJavaRDD(). .toJavaRDD()
map(p -> new AtomicAction(p.getClass(),p)) .map(p -> new AtomicAction(p.getClass(), p))
.union(getFinalIndicatorsProject(spark, inputPath + "/projectDb") .union(
.toJavaRDD() getFinalIndicatorsProject(spark, inputPath + "/projectDb")
.map(p -> new AtomicAction(p.getClass(), p ))) .toJavaRDD()
.union(getFinalIndicatorsDatasource(spark, inputPath + "/datasourceDb") .map(p -> new AtomicAction(p.getClass(), p)))
.toJavaRDD() .union(
.map(p -> new AtomicAction(p.getClass(), p))) getFinalIndicatorsDatasource(spark, inputPath + "/datasourceDb")
.toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p)))
.mapToPair( .mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa)))) new Text(OBJECT_MAPPER.writeValueAsString(aa))))
@ -115,38 +115,36 @@ public class SparkAtomicActionUsageJob implements Serializable {
private static Dataset<Result> getFinalIndicatorsResult(SparkSession spark, String inputPath) { private static Dataset<Result> getFinalIndicatorsResult(SparkSession spark, String inputPath) {
return readPath(spark, inputPath, UsageStatsModel.class) return readPath(spark, inputPath, UsageStatsModel.class)
.map((MapFunction<UsageStatsModel, Result>) usm -> { .map((MapFunction<UsageStatsModel, Result>) usm -> {
Result r = new Result(); Result r = new Result();
r.setId("50|" + usm.getId()); r.setId("50|" + usm.getId());
r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews()));
return r; return r;
}, Encoders.bean(Result.class)); }, Encoders.bean(Result.class));
} }
private static Dataset<Project> getFinalIndicatorsProject(SparkSession spark, String inputPath) { private static Dataset<Project> getFinalIndicatorsProject(SparkSession spark, String inputPath) {
return readPath(spark, inputPath, UsageStatsModel.class) return readPath(spark, inputPath, UsageStatsModel.class)
.map((MapFunction<UsageStatsModel, Project>) usm -> { .map((MapFunction<UsageStatsModel, Project>) usm -> {
Project r = new Project(); Project p = new Project();
r.setId("40|" + usm.getId()); p.setId("40|" + usm.getId());
r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); p.setMeasures(getMeasure(usm.getDownloads(), usm.getViews()));
return r; return p;
}, Encoders.bean(Project.class)); }, Encoders.bean(Project.class));
} }
private static Dataset<Datasource> getFinalIndicatorsDatasource(SparkSession spark, String inputPath) { private static Dataset<Datasource> getFinalIndicatorsDatasource(SparkSession spark, String inputPath) {
return readPath(spark, inputPath, UsageStatsModel.class) return readPath(spark, inputPath, UsageStatsModel.class)
.map((MapFunction<UsageStatsModel, Datasource>) usm -> { .map((MapFunction<UsageStatsModel, Datasource>) usm -> {
Datasource r = new Datasource(); Datasource d = new Datasource();
r.setId("10|" + usm.getId()); d.setId("10|" + usm.getId());
r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); d.setMeasures(getMeasure(usm.getDownloads(), usm.getViews()));
return r; return d;
}, Encoders.bean(Datasource.class)); }, Encoders.bean(Datasource.class));
} }
private static List<Measure> getMeasure(Long downloads, Long views) { private static List<Measure> getMeasure(Long downloads, Long views) {
DataInfo dataInfo = OafMapperUtils DataInfo dataInfo = OafMapperUtils
.dataInfo( .dataInfo(

View File

@ -8,7 +8,6 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -25,6 +24,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
public class SparkAtomicActionCountJobTest { public class SparkAtomicActionCountJobTest {
@ -79,16 +79,16 @@ public class SparkAtomicActionCountJobTest {
JavaRDD<AtomicAction> tmp = sc JavaRDD<AtomicAction> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
.map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)); .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class));
//.map(aa -> (Result) aa.getPayload()); // .map(aa -> (Result) aa.getPayload());
Assertions.assertEquals(9,tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count()); Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count());
Assertions.assertEquals(9,tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")).count()); Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")).count());
Assertions.assertEquals(9,tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")).count()); Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")).count());
tmp.foreach(r -> Assertions.assertEquals(2, ((OafEntity)r.getPayload()).getMeasures().size())); tmp.foreach(r -> Assertions.assertEquals(2, ((OafEntity) r.getPayload()).getMeasures().size()));
tmp tmp
.foreach( .foreach(
r -> ((OafEntity)r.getPayload()) r -> ((OafEntity) r.getPayload())
.getMeasures() .getMeasures()
.stream() .stream()
.forEach( .forEach(
@ -98,14 +98,14 @@ public class SparkAtomicActionCountJobTest {
.forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference())))); .forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference()))));
tmp tmp
.foreach( .foreach(
r -> ((OafEntity)r.getPayload()) r -> ((OafEntity) r.getPayload())
.getMeasures() .getMeasures()
.stream() .stream()
.forEach( .forEach(
m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred())))); m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred()))));
tmp tmp
.foreach( .foreach(
r -> ((OafEntity)r.getPayload()) r -> ((OafEntity) r.getPayload())
.getMeasures() .getMeasures()
.stream() .stream()
.forEach( .forEach(
@ -116,7 +116,7 @@ public class SparkAtomicActionCountJobTest {
tmp tmp
.foreach( .foreach(
r -> ((OafEntity)r.getPayload()) r -> ((OafEntity) r.getPayload())
.getMeasures() .getMeasures()
.stream() .stream()
.forEach( .forEach(
@ -130,7 +130,7 @@ public class SparkAtomicActionCountJobTest {
u.getDataInfo().getProvenanceaction().getClassid())))); u.getDataInfo().getProvenanceaction().getClassid()))));
tmp tmp
.foreach( .foreach(
r -> ((OafEntity)r.getPayload()) r -> ((OafEntity) r.getPayload())
.getMeasures() .getMeasures()
.stream() .stream()
.forEach( .forEach(
@ -145,7 +145,7 @@ public class SparkAtomicActionCountJobTest {
tmp tmp
.foreach( .foreach(
r -> ((OafEntity)r.getPayload()) r -> ((OafEntity) r.getPayload())
.getMeasures() .getMeasures()
.stream() .stream()
.forEach( .forEach(
@ -160,13 +160,19 @@ public class SparkAtomicActionCountJobTest {
Assertions Assertions
.assertEquals( .assertEquals(
1, tmp.filter(r -> ((OafEntity)r.getPayload()).getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count()); 1,
tmp
.filter(
r -> ((OafEntity) r.getPayload())
.getId()
.equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
.count());
Assertions Assertions
.assertEquals( .assertEquals(
"0", "0",
tmp tmp
.map(r -> ((OafEntity)r.getPayload())) .map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
.collect() .collect()
.get(0) .get(0)
@ -182,8 +188,8 @@ public class SparkAtomicActionCountJobTest {
.assertEquals( .assertEquals(
"5", "5",
tmp tmp
.map(r -> ((OafEntity)r.getPayload())) .map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()
@ -199,8 +205,8 @@ public class SparkAtomicActionCountJobTest {
.assertEquals( .assertEquals(
"0", "0",
tmp tmp
.map(r -> ((OafEntity)r.getPayload())) .map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()
@ -215,8 +221,8 @@ public class SparkAtomicActionCountJobTest {
.assertEquals( .assertEquals(
"1", "1",
tmp tmp
.map(r -> ((OafEntity)r.getPayload())) .map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()
@ -232,8 +238,8 @@ public class SparkAtomicActionCountJobTest {
.assertEquals( .assertEquals(
"2", "2",
tmp tmp
.map(r -> ((OafEntity)r.getPayload())) .map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()
@ -248,8 +254,8 @@ public class SparkAtomicActionCountJobTest {
.assertEquals( .assertEquals(
"6", "6",
tmp tmp
.map(r -> ((OafEntity)r.getPayload())) .map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()
@ -261,205 +267,203 @@ public class SparkAtomicActionCountJobTest {
.get(0) .get(0)
.getValue()); .getValue());
Assertions
.assertEquals(
"0",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"5",
tmp
.map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("views"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions Assertions
.assertEquals( .assertEquals(
"0", "0",
tmp tmp
.map(r -> ((OafEntity)r.getPayload())) .map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()
.stream() .stream()
.filter(m -> m.getId().equals("downloads")) .filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList()) .collect(Collectors.toList())
.get(0) .get(0)
.getUnit() .getUnit()
.get(0) .get(0)
.getValue()); .getValue());
Assertions Assertions
.assertEquals( .assertEquals(
"5", "1",
tmp tmp
.map(r -> ((OafEntity)r.getPayload())) .map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()
.stream() .stream()
.filter(m -> m.getId().equals("views")) .filter(m -> m.getId().equals("views"))
.collect(Collectors.toList()) .collect(Collectors.toList())
.get(0) .get(0)
.getUnit() .getUnit()
.get(0) .get(0)
.getValue()); .getValue());
Assertions Assertions
.assertEquals( .assertEquals(
"0", "2",
tmp tmp
.map(r -> ((OafEntity)r.getPayload())) .map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()
.stream() .stream()
.filter(m -> m.getId().equals("downloads")) .filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList()) .collect(Collectors.toList())
.get(0) .get(0)
.getUnit() .getUnit()
.get(0) .get(0)
.getValue()); .getValue());
Assertions Assertions
.assertEquals( .assertEquals(
"1", "6",
tmp tmp
.map(r -> ((OafEntity)r.getPayload())) .map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()
.stream() .stream()
.filter(m -> m.getId().equals("views")) .filter(m -> m.getId().equals("views"))
.collect(Collectors.toList()) .collect(Collectors.toList())
.get(0) .get(0)
.getUnit() .getUnit()
.get(0) .get(0)
.getValue()); .getValue());
Assertions Assertions
.assertEquals( .assertEquals(
"2", "0",
tmp tmp
.map(r -> ((OafEntity)r.getPayload())) .map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()
.stream() .stream()
.filter(m -> m.getId().equals("downloads")) .filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList()) .collect(Collectors.toList())
.get(0) .get(0)
.getUnit() .getUnit()
.get(0) .get(0)
.getValue()); .getValue());
Assertions Assertions
.assertEquals( .assertEquals(
"6", "5",
tmp tmp
.map(r -> ((OafEntity)r.getPayload())) .map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()
.stream() .stream()
.filter(m -> m.getId().equals("views")) .filter(m -> m.getId().equals("views"))
.collect(Collectors.toList()) .collect(Collectors.toList())
.get(0) .get(0)
.getUnit() .getUnit()
.get(0) .get(0)
.getValue()); .getValue());
Assertions Assertions
.assertEquals( .assertEquals(
"0", "0",
tmp tmp
.map(r -> ((OafEntity)r.getPayload())) .map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()
.stream() .stream()
.filter(m -> m.getId().equals("downloads")) .filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList()) .collect(Collectors.toList())
.get(0) .get(0)
.getUnit() .getUnit()
.get(0) .get(0)
.getValue()); .getValue());
Assertions Assertions
.assertEquals( .assertEquals(
"5", "1",
tmp tmp
.map(r -> ((OafEntity)r.getPayload())) .map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()
.stream() .stream()
.filter(m -> m.getId().equals("views")) .filter(m -> m.getId().equals("views"))
.collect(Collectors.toList()) .collect(Collectors.toList())
.get(0) .get(0)
.getUnit() .getUnit()
.get(0) .get(0)
.getValue()); .getValue());
Assertions Assertions
.assertEquals( .assertEquals(
"0", "2",
tmp tmp
.map(r -> ((OafEntity)r.getPayload())) .map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()
.stream() .stream()
.filter(m -> m.getId().equals("downloads")) .filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList()) .collect(Collectors.toList())
.get(0) .get(0)
.getUnit() .getUnit()
.get(0) .get(0)
.getValue()); .getValue());
Assertions Assertions
.assertEquals( .assertEquals(
"1", "6",
tmp tmp
.map(r -> ((OafEntity)r.getPayload())) .map(r -> ((OafEntity) r.getPayload()))
.filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f"))
.collect() .collect()
.get(0) .get(0)
.getMeasures() .getMeasures()
.stream() .stream()
.filter(m -> m.getId().equals("views")) .filter(m -> m.getId().equals("views"))
.collect(Collectors.toList()) .collect(Collectors.toList())
.get(0) .get(0)
.getUnit() .getUnit()
.get(0) .get(0)
.getValue()); .getValue());
Assertions
.assertEquals(
"2",
tmp
.map(r -> ((OafEntity)r.getPayload()))
.filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("downloads"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
Assertions
.assertEquals(
"6",
tmp
.map(r -> ((OafEntity)r.getPayload()))
.filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f"))
.collect()
.get(0)
.getMeasures()
.stream()
.filter(m -> m.getId().equals("views"))
.collect(Collectors.toList())
.get(0)
.getUnit()
.get(0)
.getValue());
} }
} }