From 3b9020c1b7803d81ed4a9aaf9acd9a7b58f00b23 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 19 Jan 2022 18:15:55 +0100 Subject: [PATCH] added unit test for the DispatchEntitiesJob --- .../group/GroupEntitiesSparkJobTest.java | 104 +++++++++++++----- 1 file changed, 78 insertions(+), 26 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java index 729948f52..3bd1c13de 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java @@ -1,6 +1,8 @@ package eu.dnetlib.dhp.oa.graph.group; +import static org.junit.jupiter.api.Assertions.*; + import java.io.IOException; import java.net.URISyntaxException; import java.nio.file.Files; @@ -19,22 +21,34 @@ import org.junit.jupiter.api.*; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob; import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob; import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.utils.DHPUtils; +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class GroupEntitiesSparkJobTest { private static SparkSession spark; - private Path workingDir; - private Path graphInputPath; + private static ObjectMapper mapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - private Path outputPath; + private static Path workingDir; + private Path dataInputPath; + + private Path groupEntityPath; + private Path dispatchEntityPath; @BeforeAll - public static void beforeAll() { + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName()); + SparkConf conf = new SparkConf(); conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName()); conf.setMaster("local"); @@ -45,48 +59,86 @@ public class GroupEntitiesSparkJobTest { @BeforeEach public void beforeEach() throws IOException, URISyntaxException { - workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName()); - graphInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI()); - outputPath = workingDir.resolve("output"); - } - - @AfterEach - public void afterEach() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); + dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI()); + groupEntityPath = workingDir.resolve("grouped_entity"); + dispatchEntityPath = workingDir.resolve("dispatched_entity"); } @AfterAll - public static void afterAll() { + public static void afterAll() throws IOException { spark.stop(); + FileUtils.deleteDirectory(workingDir.toFile()); } @Test + @Order(1) void testGroupEntities() throws Exception { GroupEntitiesSparkJob.main(new String[] { "-isSparkSessionManaged", Boolean.FALSE.toString(), "-graphInputPath", - graphInputPath.toString(), + dataInputPath.toString(), "-outputPath", - outputPath.toString() + groupEntityPath.toString() }); - ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); Dataset output = spark .read() - .textFile(outputPath.toString()) + .textFile(groupEntityPath.toString()) .map((MapFunction) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING()) .map((MapFunction) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); - Assertions - .assertEquals( - 1, - output - .filter( - (FilterFunction) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9" - .equals(r.getId()) && - r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo"))) - .count()); + assertEquals( + 1, + output + .filter( + (FilterFunction) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9" + .equals(r.getId()) && + r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo"))) + .count()); + } + + @Test + @Order(2) + void testDispatchEntities() throws Exception { + for (String type : Lists + .newArrayList( + Publication.class.getCanonicalName(), eu.dnetlib.dhp.schema.oaf.Dataset.class.getCanonicalName())) { + String directory = StringUtils.substringAfterLast(type, ".").toLowerCase(); + DispatchEntitiesSparkJob.main(new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + groupEntityPath.toString(), + "-outputPath", + dispatchEntityPath.resolve(directory).toString(), + "-graphTableClassName", + type + }); + } + + Dataset output = spark + .read() + .textFile( + DHPUtils + .toSeq( + HdfsSupport + .listFiles(dispatchEntityPath.toString(), spark.sparkContext().hadoopConfiguration()))) + .map((MapFunction) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); + + assertEquals(3, output.count()); + assertEquals( + 2, + output + .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) + .filter((FilterFunction) s -> s.equals("publication")) + .count()); + assertEquals( + 1, + output + .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) + .filter((FilterFunction) s -> s.equals("dataset")) + .count()); } }