From 630a6a1fe7e4a762450d638269a6b6c0f615dbb1 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 24 Aug 2020 16:51:26 +0200 Subject: [PATCH] first tests for the pid graph dump --- .../dhp/oa/graph/dump/pid/DumpAuthorTest.java | 96 +++++++ .../dump/pid/DumpResultRelationTest.java | 161 +++++++++++ .../dhp/oa/graph/dump/pid/DumpResultTest.java | 96 +++++++ .../dump/pid/PreparedInfoCollectTest.java | 168 +++++++++++ .../graph/dump/pid/PreparedInfoPidTest.java | 260 ++++++++++++++++++ 5 files changed, 781 insertions(+) create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/DumpAuthorTest.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/DumpResultRelationTest.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/DumpResultTest.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/PreparedInfoCollectTest.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/PreparedInfoPidTest.java diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/DumpAuthorTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/DumpAuthorTest.java new file mode 100644 index 0000000000..61abd3cdd7 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/DumpAuthorTest.java @@ -0,0 +1,96 @@ + +package eu.dnetlib.dhp.oa.graph.dump.pid; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.schema.dump.pidgraph.Entity; + +public class DumpAuthorTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + + private static final Logger log = LoggerFactory.getLogger(DumpAuthorTest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(DumpAuthorTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(DumpAuthorTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(DumpAuthorTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void testDumpAuthorPids() throws Exception { + + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/pid/result") + .getPath(); + + SparkDumpPidAuthor.main(new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString(), + "-sourcePath", sourcePath, + "-allowedAuthorPids", "[\"orcid\", \"mag\"]" + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/author") + .map(item -> OBJECT_MAPPER.readValue(item, Entity.class)); + + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(Entity.class)); + + verificationDataset.show(false); + Assertions.assertEquals(3, verificationDataset.count()); + + Assertions.assertEquals(1, verificationDataset.filter("id = 'mag:fakemag'").count()); + + Assertions.assertEquals(2, verificationDataset.filter("substr(id,1,5) = 'orcid'").count()); + + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/DumpResultRelationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/DumpResultRelationTest.java new file mode 100644 index 0000000000..24abdd4159 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/DumpResultRelationTest.java @@ -0,0 +1,161 @@ + +package eu.dnetlib.dhp.oa.graph.dump.pid; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.schema.dump.oaf.graph.Relation; +import eu.dnetlib.dhp.schema.dump.pidgraph.Entity; + +public class DumpResultRelationTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + + private static final Logger log = LoggerFactory.getLogger(DumpResultRelationTest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(DumpResultRelationTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(DumpResultRelationTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(DumpAuthorTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void testDumpResultRelationPids() throws Exception { + + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/pid/preparedInfoDR") + .getPath(); + + SparkDumpResultRelation.main(new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString(), + "-preparedInfoPath", sourcePath + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/relation") + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); + + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(Relation.class)); + + verificationDataset.show(100, false); + Assertions.assertEquals(32, verificationDataset.count()); + + Assertions + .assertEquals( + 1, verificationDataset + .filter( + "source.id = 'orcid:0000-0001-9317-9350' and " + + "relType.name = 'sameAs' and target.id = 'mag:fakeMag'") + .count()); + + Assertions + .assertEquals( + 4, verificationDataset + .filter( + "source.id = 'orcid:0000-0001-9317-9350' and " + + "relType.name = 'isAuthorOf'") + .count()); + + Assertions + .assertEquals( + 4, verificationDataset + .filter( + "target.id = 'orcid:0000-0001-9317-9350' and " + + "relType.name = 'hasAuthor'") + .count()); + + Assertions + .assertEquals( + 1, verificationDataset + .filter( + "source.id = 'orcid:0000-0001-9317-9350' and " + + "relType.name = 'hasCoAuthor'") + .count()); + + Assertions + .assertEquals( + 1, verificationDataset + .filter( + "source.id = 'orcid:0000-0001-9317-9350' and " + + "relType.name = 'hasCoAuthor' and target.id = 'orcid:0000-0002-1114-4216'") + .count()); + + Assertions + .assertEquals( + 2, verificationDataset + .filter( + "source.id = 'orcid:0000-0002-1114-4216' and " + + "relType.name = 'hasCoAuthor'") + .count()); + + Assertions + .assertEquals( + 1, verificationDataset + .filter( + "target.id = 'orcid:0000-0001-9317-9350' and " + + "relType.name = 'hasCoAuthor' and source.id = 'orcid:0000-0002-1114-4216'") + .count()); + + Assertions + .assertEquals( + 1, verificationDataset + .filter( + "target.id = 'mag:fakeMag' and " + + "relType.name = 'hasCoAuthor' and source.id = 'orcid:0000-0002-1114-4216'") + .count()); + + Assertions.assertEquals(4, verificationDataset.filter("relType.name = 'hasOtherManifestation'").count()); + +// +// Assertions.assertEquals(1, verificationDataset.filter("id = 'mag:fakemag'").count()); +// +// Assertions.assertEquals(2, verificationDataset.filter("substr(id,1,5) = 'orcid'").count()); + + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/DumpResultTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/DumpResultTest.java new file mode 100644 index 0000000000..156c130131 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/DumpResultTest.java @@ -0,0 +1,96 @@ + +package eu.dnetlib.dhp.oa.graph.dump.pid; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.schema.dump.pidgraph.Entity; + +public class DumpResultTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + + private static final Logger log = LoggerFactory.getLogger(DumpResultTest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(DumpResultTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(DumpAuthorTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(DumpResultTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void testDumpResultPids() throws Exception { + + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/pid/preparedInfo") + .getPath(); + + SparkDumpPidResult.main(new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString(), + "-preparedInfoPath", sourcePath + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/result") + .map(item -> OBJECT_MAPPER.readValue(item, Entity.class)); + + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(Entity.class)); + + Assertions.assertEquals(35, verificationDataset.count()); + + Assertions.assertEquals(32, verificationDataset.filter("substr(id,1,3) = 'doi'").count()); + + Assertions.assertEquals(1, verificationDataset.filter("substr(id,1,3) = 'pdb'").count()); + + Assertions.assertEquals(1, verificationDataset.filter("substr(id,1,4) = 'pmid'").count()); + + Assertions.assertEquals(1, verificationDataset.filter("substr(id,1,5) = 'arXiv'").count()); + + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/PreparedInfoCollectTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/PreparedInfoCollectTest.java new file mode 100644 index 0000000000..7952c36f82 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/PreparedInfoCollectTest.java @@ -0,0 +1,168 @@ + +package eu.dnetlib.dhp.oa.graph.dump.pid; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +public class PreparedInfoCollectTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + + private static final Logger log = LoggerFactory.getLogger(PreparedInfoCollectTest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(PreparedInfoCollectTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(PreparedInfoCollectTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(PreparedInfoCollectTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void testCollectAndSave() throws Exception { +//software and otherresearchproduct associated preparedInfo files intended to be empty + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/pid/preparedInfoSplit") + .getPath(); + + SparkCollectPreparedInfo.main(new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/preparedInfo", + "-preparedInfoPath", sourcePath + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/preparedInfo") + .map(item -> OBJECT_MAPPER.readValue(item, ResultPidsList.class)); + + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(ResultPidsList.class)); + + Assertions.assertEquals(34, verificationDataset.count()); + + verificationDataset.createOrReplaceTempView("check"); + + Assertions + .assertEquals( + 33, spark + .sql( + "Select resultId " + + "from check " + + "lateral view explode (resultAllowedPids) r as resallowed " + + " where resallowed.key = 'doi'") + .count()); + + Assertions + .assertEquals( + 4, spark + .sql( + "SELECT pids.value " + + "FROM check " + + "LATERAL VIEW EXPLODE (authorAllowedPids) a as authallowed " + + "LATERAL VIEW EXPLODE (authallowed) a as pids " + + "WHERE pids.key = 'orcid'") + .count()); + + Assertions + .assertEquals( + 1, spark + .sql( + "SELECT pids.value " + + "FROM check " + + "LATERAL VIEW EXPLODE (authorAllowedPids) a as authallowed " + + "LATERAL VIEW EXPLODE (authallowed) a as pids " + + "WHERE pids.key = 'mag'") + .count()); + + Dataset check = spark + .sql( + "Select resultId, pids.value orcid, resallowed.key pid_type, resallowed.value pid " + + "from check" + + " lateral view explode(authorAllowedPids) a as authallowed " + + " lateral view explode(authallowed) p as pids " + + " lateral view explode (resultAllowedPids) r as resallowed " + + "where pids.key = 'orcid'"); + + Assertions.assertEquals(6, check.count()); + + Assertions + .assertEquals(1, check.filter("resultId = '50|base_oa_____::1a700385222228181f20835bae60a71e'").count()); + Assertions + .assertEquals( + 1, check + .filter( + "resultId = '50|base_oa_____::1a700385222228181f20835bae60a71e' " + + "and orcid = '0000-0002-1114-4216'") + .count()); + Assertions + .assertEquals( + 1, check + .filter( + "resultId = '50|base_oa_____::1a700385222228181f20835bae60a71e' " + + "and pid = '10.1016/j.jrmge.2016.11.005'") + .count()); + + Assertions + .assertEquals(1, check.filter("resultId = '50|base_oa_____::06546a1ad6b1c71e5e366ef15b9ade1f'").count()); + Assertions + .assertEquals( + 1, check + .filter( + "resultId = '50|base_oa_____::06546a1ad6b1c71e5e366ef15b9ade1f' " + + "and orcid = '0000-0001-9317-9350'") + .count()); + Assertions + .assertEquals( + 1, check + .filter( + "resultId = '50|base_oa_____::06546a1ad6b1c71e5e366ef15b9ade1f' " + + "and pid = '10.1016/j.jotr.2014.10.003'") + .count()); + + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/PreparedInfoPidTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/PreparedInfoPidTest.java new file mode 100644 index 0000000000..3b6bec7338 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/pid/PreparedInfoPidTest.java @@ -0,0 +1,260 @@ + +package eu.dnetlib.dhp.oa.graph.dump.pid; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +public class PreparedInfoPidTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + + private static final Logger log = LoggerFactory.getLogger(PreparedInfoPidTest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(PreparedInfoPidTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(PreparedInfoPidTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(PreparedInfoPidTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void testPublication() throws Exception { + + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/pid/result/publication") + .getPath(); + + SparkPrepareResultPids.main(new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/preparedInfo", + "-sourcePath", sourcePath, + "-allowedResultPids", "[\"doi\",\"arxiv\",\"pmc\",\"pmid\",\"pdb\"]", + "-allowedAuthorPids", "[\"orcid\"]", + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication" + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/preparedInfo/publication") + .map(item -> OBJECT_MAPPER.readValue(item, ResultPidsList.class)); + + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(ResultPidsList.class)); + + Assertions.assertEquals(31, verificationDataset.count()); + + verificationDataset.createOrReplaceTempView("check"); + + Assertions + .assertEquals( + 31, spark + .sql( + "Select resultId " + + "from check " + + "lateral view explode (resultAllowedPids) r as resallowed " + + " where resallowed.key = 'doi'") + .count()); + + Dataset check = spark + .sql( + "Select resultId, pids.value orcid, resallowed.key pid_type, resallowed.value pid " + + "from check" + + " lateral view explode(authorAllowedPids) a as authallowed " + + " lateral view explode(authallowed) p as pids " + + " lateral view explode (resultAllowedPids) r as resallowed " + + "where pids.key = 'orcid'"); + + Assertions.assertEquals(2, check.count()); + + Assertions + .assertEquals(1, check.filter("resultId = '50|base_oa_____::1a700385222228181f20835bae60a71e'").count()); + Assertions + .assertEquals( + 1, check + .filter( + "resultId = '50|base_oa_____::1a700385222228181f20835bae60a71e' " + + "and orcid = '0000-0002-1114-4216'") + .count()); + Assertions + .assertEquals( + 1, check + .filter( + "resultId = '50|base_oa_____::1a700385222228181f20835bae60a71e' " + + "and pid = '10.1016/j.jrmge.2016.11.005'") + .count()); + + Assertions + .assertEquals(1, check.filter("resultId = '50|base_oa_____::06546a1ad6b1c71e5e366ef15b9ade1f'").count()); + Assertions + .assertEquals( + 1, check + .filter( + "resultId = '50|base_oa_____::06546a1ad6b1c71e5e366ef15b9ade1f' " + + "and orcid = '0000-0001-9317-9350'") + .count()); + Assertions + .assertEquals( + 1, check + .filter( + "resultId = '50|base_oa_____::06546a1ad6b1c71e5e366ef15b9ade1f' " + + "and pid = '10.1016/j.jotr.2014.10.003'") + .count()); + + } + + @Test + public void testDataset() throws Exception { + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/pid/result/dataset") + .getPath(); + + SparkPrepareResultPids.main(new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/preparedInfo", + "-sourcePath", sourcePath, + "-allowedResultPids", "[\"doi\",\"arxiv\",\"pmc\",\"pmid\",\"pdb\"]", + "-allowedAuthorPids", "[\"orcid\"]", + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset" + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/preparedInfo/dataset") + .map(item -> OBJECT_MAPPER.readValue(item, ResultPidsList.class)); + + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(ResultPidsList.class)); + + Assertions.assertEquals(3, verificationDataset.count()); + + verificationDataset.createOrReplaceTempView("check"); + + Dataset check = spark + .sql( + "Select resultId, pids.value orcid, resallowed.key pid_type, resallowed.value pid " + + "from check" + + " lateral view explode(authorAllowedPids) a as authallowed " + + " lateral view explode(authallowed) p as pids " + + " lateral view explode (resultAllowedPids) r as resallowed "); + + Assertions.assertEquals(4, check.count()); + + Assertions + .assertEquals(4, check.filter("resultId = '50|_____OmicsDI::00899d9cb1163754943a3277365adc02'").count()); + Assertions + .assertEquals( + 2, check + .filter( + "resultId = '50|_____OmicsDI::00899d9cb1163754943a3277365adc02' " + + "and (orcid = '0000-0001-9317-9350' or orcid = '0000-0002-1114-4216') and " + + "pid = '10.1016/fake' and pid_type = 'doi' ") + .count()); + + Assertions + .assertEquals( + 2, check + .filter( + "resultId = '50|_____OmicsDI::00899d9cb1163754943a3277365adc02' " + + "and (orcid = '0000-0001-9317-9350' or orcid = '0000-0002-1114-4216') and " + + "pid = '10443fake' and pid_type = 'pmid' ") + .count()); + + check = spark + .sql( + "Select resultId, authorAllowedPids, resallowed.key pid_type, resallowed.value pid " + + "from check" + + " lateral view explode (resultAllowedPids) r as resallowed "); + + Assertions.assertEquals(5, check.count()); + + Assertions + .assertEquals(2, check.filter("resultId = '50|_____OmicsDI::00899d9cb1163754943a3277365adc02'").count()); + Assertions + .assertEquals( + 1, check + .filter( + "resultId = '50|_____OmicsDI::00899d9cb1163754943a3277365adc02' " + + " and pid = '10.1016/fake' and pid_type = 'doi' ") + .count()); + Assertions + .assertEquals( + 1, check + .filter( + "resultId = '50|_____OmicsDI::00899d9cb1163754943a3277365adc02' " + + " and pid = '10443fake' and pid_type = 'pmid' ") + .count()); + + Assertions + .assertEquals(1, check.filter("resultId = '50|_____OmicsDI::023fd1fcbb64f0f7df0671798a62f379'").count()); + Assertions + .assertEquals( + 1, check + .filter( + "resultId = '50|_____OmicsDI::023fd1fcbb64f0f7df0671798a62f379' " + + " and pid = 'fakepdb' and pid_type = 'pdb' ") + .count()); + + Assertions + .assertEquals(2, check.filter("resultId = '50|_____OmicsDI::036d65211a6ac14237c6e2d7cc223386'").count()); + Assertions + .assertEquals( + 1, check + .filter( + "resultId = '50|_____OmicsDI::036d65211a6ac14237c6e2d7cc223386' " + + " and pid = '10.1016/j.tcs.2012.06.029' and pid_type = 'doi' ") + .count()); + Assertions + .assertEquals( + 1, check + .filter( + "resultId = '50|_____OmicsDI::036d65211a6ac14237c6e2d7cc223386' " + + " and pid = 'fake_arxiv' and pid_type = 'arXiv' ") + .count()); + + } +}