dhp-graph-dump/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromEntitiesJobTest.java

258 lines
8.9 KiB
Java

package eu.dnetlib.dhp.oa.graph.dump.skgif;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EmitPerManifestation;
import eu.dnetlib.dhp.skgif.model.Persons;
import eu.dnetlib.dhp.skgif.model.Topic;
//@Disabled
public class EmitFromEntitiesJobTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory.getLogger(EmitFromEntitiesJobTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(EmitFromEntitiesJobTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(EmitFromEntitiesJobTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(EmitFromEntitiesJobTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
public void testEmitFromResult() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/skgif/graph/")
.getPath();
EmitFromEntities
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-outputPath", workingDir.toString() + "/result/",
"-workingDir", workingDir.toString() + "/"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Persons> persons = sc
.textFile(workingDir.toString() + "/result/Persons")
.map(item -> OBJECT_MAPPER.readValue(item, Persons.class));
JavaRDD<Topic> topics = sc
.textFile(workingDir.toString() + "/result/Topic")
.map(item -> OBJECT_MAPPER.readValue(item, Topic.class));
JavaRDD<EmitPerManifestation> manifestation = sc
.textFile(workingDir.toString() + "/publication/manifestation")
.map(item -> OBJECT_MAPPER.readValue(item, EmitPerManifestation.class));
org.apache.spark.sql.Dataset<Persons> personsDataset = spark
.createDataset(persons.rdd(), Encoders.bean(Persons.class));
personsDataset.show(false);
Persons claudiaBorer = personsDataset
.filter(
(FilterFunction<Persons>) p -> p
.getLocal_identifier()
.equalsIgnoreCase("temp_person_::2c1eea261f7d9a97ab7ca8c4200781db"))
.first();
Assertions
.assertEquals(
2,
personsDataset
.filter(
(FilterFunction<Persons>) p -> p.getGiven_name().equalsIgnoreCase("claudia")
&& p.getFamily_name().equalsIgnoreCase("borer"))
.count());
Assertions
.assertEquals(
1,
personsDataset
.filter(
(FilterFunction<Persons>) p -> p.getGiven_name().equalsIgnoreCase("claudia")
&& p.getFamily_name().equalsIgnoreCase("borer")
&& !p
.getLocal_identifier()
.equalsIgnoreCase("temp_person_::2c1eea261f7d9a97ab7ca8c4200781db"))
.count());
Assertions.assertEquals("claudia", claudiaBorer.getGiven_name().toLowerCase());
Assertions.assertEquals("borer", claudiaBorer.getFamily_name().toLowerCase());
Assertions
.assertEquals(
2,
personsDataset
.filter((FilterFunction<Persons>) p -> p.getLocal_identifier().startsWith("person"))
.count());
Assertions
.assertEquals(
1,
personsDataset
.filter(
(FilterFunction<Persons>) p -> p.getLocal_identifier().startsWith("person")
&& p.getIdentifiers().get(0).getValue().equals("0000-0002-5597-4916"))
.count());
Persons orcidPerson = personsDataset
.filter(
(FilterFunction<Persons>) p -> p.getLocal_identifier().startsWith("person")
&& p.getIdentifiers().get(0).getValue().equals("0000-0002-5597-4916"))
.first();
Assertions.assertEquals("M.", orcidPerson.getGiven_name());
Assertions.assertEquals("Kooi", orcidPerson.getFamily_name());
Assertions.assertEquals(1, orcidPerson.getIdentifiers().size());
Assertions.assertEquals("orcid", orcidPerson.getIdentifiers().get(0).getScheme());
Assertions.assertEquals("0000-0002-5597-4916", orcidPerson.getIdentifiers().get(0).getValue());
Dataset<EmitPerManifestation> manifestationDataset = spark
.createDataset(manifestation.rdd(), Encoders.bean(EmitPerManifestation.class));
manifestationDataset.show(false);
Assertions.assertEquals(5, manifestationDataset.count());
Dataset<Topic> topicDataset = spark
.createDataset(topics.rdd(), Encoders.bean(Topic.class));
Assertions.assertEquals(0, topicDataset.count());
}
@Test
public void testEmitFromResultComplete() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/skgif/graph_complete_entities/")
.getPath();
EmitFromEntities
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-outputPath", workingDir.toString() + "/result/",
"-workingDir", workingDir.toString() + "/"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Persons> persons = sc
.textFile(workingDir.toString() + "/result/Persons")
.map(item -> OBJECT_MAPPER.readValue(item, Persons.class));
org.apache.spark.sql.Dataset<Persons> personsDataset = spark
.createDataset(persons.rdd(), Encoders.bean(Persons.class));
personsDataset.foreach((ForeachFunction<Persons>) p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p)));
JavaRDD<Topic> topics = sc
.textFile(workingDir.toString() + "/result/Topic")
.map(item -> OBJECT_MAPPER.readValue(item, Topic.class));
Dataset<Topic> topicDataset = spark
.createDataset(topics.rdd(), Encoders.bean(Topic.class));
Assertions.assertEquals(3, topicDataset.count());
topicDataset.foreach((ForeachFunction<Topic>) t -> System.out.println(OBJECT_MAPPER.writeValueAsString(t)));
JavaRDD<EmitPerManifestation> manifestation = sc
.textFile(workingDir.toString() + "/publication/manifestation")
.map(item -> OBJECT_MAPPER.readValue(item, EmitPerManifestation.class));
Dataset<EmitPerManifestation> manifestationDataset = spark
.createDataset(manifestation.rdd(), Encoders.bean(EmitPerManifestation.class));
manifestationDataset.show(false);
}
@Test
public void testEmitFromResultApiSubset() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/skgif/graphForAPIExample/")
.getPath();
EmitFromEntities
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-outputPath", workingDir.toString() + "/result/",
"-workingDir", workingDir.toString() + "/"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Persons> persons = sc
.textFile(workingDir.toString() + "/result/Persons")
.map(item -> OBJECT_MAPPER.readValue(item, Persons.class));
org.apache.spark.sql.Dataset<Persons> personsDataset = spark
.createDataset(persons.rdd(), Encoders.bean(Persons.class));
personsDataset.foreach((ForeachFunction<Persons>) p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p)));
JavaRDD<Topic> topics = sc
.textFile(workingDir.toString() + "/result/Topic")
.map(item -> OBJECT_MAPPER.readValue(item, Topic.class));
Dataset<Topic> topicDataset = spark
.createDataset(topics.rdd(), Encoders.bean(Topic.class));
// Assertions.assertEquals(3, topicDataset.count());
topicDataset.foreach((ForeachFunction<Topic>) t -> System.out.println(OBJECT_MAPPER.writeValueAsString(t)));
JavaRDD<EmitPerManifestation> manifestation = sc
.textFile(workingDir.toString() + "/publication/manifestation")
.map(item -> OBJECT_MAPPER.readValue(item, EmitPerManifestation.class));
Dataset<EmitPerManifestation> manifestationDataset = spark
.createDataset(manifestation.rdd(), Encoders.bean(EmitPerManifestation.class));
manifestation.foreach(m-> System.out.println(OBJECT_MAPPER.writeValueAsString(m)));
}
}