1
0
Fork 0

code formatting

This commit is contained in:
Claudio Atzori 2022-01-19 17:17:11 +01:00
parent 391aa1373b
commit abfa9c6045
1 changed files with 65 additions and 64 deletions

View File

@ -1,12 +1,12 @@
package eu.dnetlib.dhp.oa.graph.group; package eu.dnetlib.dhp.oa.graph.group;
import com.fasterxml.jackson.databind.DeserializationFeature; import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper; import java.net.URISyntaxException;
import eu.dnetlib.dhp.common.HdfsSupport; import java.nio.file.Files;
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob; import java.nio.file.Path;
import eu.dnetlib.dhp.schema.common.ModelSupport; import java.nio.file.Paths;
import eu.dnetlib.dhp.schema.oaf.Result;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -17,75 +17,76 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import java.io.IOException; import com.fasterxml.jackson.databind.DeserializationFeature;
import java.net.URISyntaxException; import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.file.Files;
import java.nio.file.Path; import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
import java.nio.file.Paths; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Result;
public class GroupEntitiesSparkJobTest { public class GroupEntitiesSparkJobTest {
private static SparkSession spark; private static SparkSession spark;
private Path workingDir; private Path workingDir;
//private Path inputDir; private Path graphInputPath;
private Path graphInputPath;
private Path outputPath; private Path outputPath;
@BeforeAll @BeforeAll
public static void beforeAll() { public static void beforeAll() {
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName()); conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName());
conf.setMaster("local"); conf.setMaster("local");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses()); conf.registerKryoClasses(ModelSupport.getOafModelClasses());
spark = SparkSession.builder().config(conf).getOrCreate(); spark = SparkSession.builder().config(conf).getOrCreate();
} }
@BeforeEach @BeforeEach
public void beforeEach() throws IOException, URISyntaxException { public void beforeEach() throws IOException, URISyntaxException {
workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName()); workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName());
//inputDir = workingDir.resolve("input"); graphInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI());
graphInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI()); outputPath = workingDir.resolve("output");
outputPath = workingDir.resolve("output"); }
}
@AfterEach @AfterEach
public void afterEach() throws IOException { public void afterEach() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile()); FileUtils.deleteDirectory(workingDir.toFile());
} }
@AfterAll @AfterAll
public static void afterAll() { public static void afterAll() {
spark.stop(); spark.stop();
} }
@Test @Test
void testGroupEntities() throws Exception { void testGroupEntities() throws Exception {
GroupEntitiesSparkJob.main(new String[] { GroupEntitiesSparkJob.main(new String[] {
"-isSparkSessionManaged", "-isSparkSessionManaged",
Boolean.FALSE.toString(), Boolean.FALSE.toString(),
"-graphInputPath", "-graphInputPath",
graphInputPath.toString(), graphInputPath.toString(),
"-outputPath", "-outputPath",
outputPath.toString() outputPath.toString()
}); });
ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Dataset<Result> output = spark Dataset<Result> output = spark
.read() .read()
.textFile(outputPath.toString()) .textFile(outputPath.toString())
.map((MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING()) .map((MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING())
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); .map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class));
Assertions.assertEquals( Assertions
1, .assertEquals(
output 1,
.filter((FilterFunction<Result>) r -> output
"50|doi_________::09821844208a5cd6300b2bfb13bca1b9".equals(r.getId()) && .filter(
r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo")) ) (FilterFunction<Result>) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9"
.count()); .equals(r.getId()) &&
} r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo")))
.count());
}
} }