1
0
Fork 0

changed test because data are saved as SequenceFile now, and because of the group by the umber of produced update decrease

This commit is contained in:
Miriam Baglioni 2020-05-28 10:26:12 +02:00
parent 37c155b86a
commit 35b7279147
1 changed files with 7 additions and 3 deletions

View File

@ -5,7 +5,9 @@ import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
@ -82,10 +84,12 @@ public class SparkUpdateProjectTest {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Project> tmp = sc JavaRDD<Project> tmp = sc
.textFile(workingDir.toString() + "/actionSet") .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
.map(item -> OBJECT_MAPPER.readValue(item, Project.class)); .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Project)aa.getPayload()))
;
Assertions.assertEquals(16, tmp.count()); Assertions.assertEquals(14, tmp.count());
// Dataset<CSVProgramme> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(CSVProgramme.class)); // Dataset<CSVProgramme> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(CSVProgramme.class));
// //