From 35b72791478e42f3e1e1b1f708ddf825acb4dcc5 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 28 May 2020 10:26:12 +0200 Subject: [PATCH] changed test because data are saved as SequenceFile now, and because of the group by the umber of produced update decrease --- .../actionmanager/project/SparkUpdateProjectTest.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/SparkUpdateProjectTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/SparkUpdateProjectTest.java index f7b8722c4f..64c6ac32f9 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/SparkUpdateProjectTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/SparkUpdateProjectTest.java @@ -5,7 +5,9 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import eu.dnetlib.dhp.schema.action.AtomicAction; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -82,10 +84,12 @@ public class SparkUpdateProjectTest { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/actionSet") - .map(item -> OBJECT_MAPPER.readValue(item, Project.class)); + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Project)aa.getPayload())) + ; - Assertions.assertEquals(16, tmp.count()); + Assertions.assertEquals(14, tmp.count()); // Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(CSVProgramme.class)); //