diff --git a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttocommunityfromorganization/ResultToCommunityJobTest.java b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttocommunityfromorganization/ResultToCommunityJobTest.java index 0051ac01b2..78b311bc1c 100644 --- a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttocommunityfromorganization/ResultToCommunityJobTest.java +++ b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttocommunityfromorganization/ResultToCommunityJobTest.java @@ -1,4 +1,316 @@ package eu.dnetlib.dhp.resulttocommunityfromorganization; +import static org.apache.spark.sql.functions.*; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.orcidtoresultfromsemrel.OrcidPropagationJobTest; +import eu.dnetlib.dhp.schema.oaf.Dataset; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class ResultToCommunityJobTest { + + private static final Logger log = LoggerFactory.getLogger(ResultToCommunityJobTest.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final ClassLoader cl = ResultToCommunityJobTest.class.getClassLoader(); + + private static SparkSession spark; + + private static Path workingDir; + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(ResultToCommunityJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(ResultToCommunityJobTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = + SparkSession.builder() + .appName(OrcidPropagationJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void test1() throws Exception { + SparkResultToCommunityFromOrganizationJob2.main( + new String[] { + "-isTest", + Boolean.TRUE.toString(), + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-sourcePath", + getClass() + .getResource("/eu/dnetlib/dhp/resulttocommunityfromorganization/sample") + .getPath(), + "-hive_metastore_uris", + "", + "-saveGraph", + "true", + "-resultTableName", + "eu.dnetlib.dhp.schema.oaf.Dataset", + "-outputPath", + workingDir.toString() + "/dataset", + "-preparedInfoPath", + getClass() + .getResource( + "/eu/dnetlib/dhp/resulttocommunityfromorganization/preparedInfo") + .getPath() + }); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = + sc.textFile(workingDir.toString() + "/dataset") + .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); + + Assertions.assertEquals(10, tmp.count()); + org.apache.spark.sql.Dataset verificationDataset = + spark.createDataset(tmp.rdd(), Encoders.bean(Dataset.class)); + + verificationDataset.createOrReplaceTempView("dataset"); + + String query = + "select id, MyT.id community " + + "from dataset " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD " + + "where MyD.inferenceprovenance = 'propagation'"; + + org.apache.spark.sql.Dataset resultExplodedProvenance = spark.sql(query); + Assertions.assertEquals(5, resultExplodedProvenance.count()); + Assertions.assertEquals( + 0, + resultExplodedProvenance + .filter("id = '50|dedup_wf_001::afaf128022d29872c4dad402b2db04fe'") + .count()); + Assertions.assertEquals( + 1, + resultExplodedProvenance + .filter("id = '50|dedup_wf_001::3f62cfc27024d564ea86760c494ba93b'") + .count()); + Assertions.assertEquals( + "beopen", + resultExplodedProvenance + .select("community") + .where( + resultExplodedProvenance + .col("id") + .equalTo( + "50|dedup_wf_001::3f62cfc27024d564ea86760c494ba93b")) + .collectAsList() + .get(0) + .getString(0)); + + Assertions.assertEquals( + 2, + resultExplodedProvenance + .filter("id = '50|od________18::8887b1df8b563c4ea851eb9c882c9d7b'") + .count()); + Assertions.assertEquals( + "mes", + resultExplodedProvenance + .select("community") + .where( + resultExplodedProvenance + .col("id") + .equalTo( + "50|od________18::8887b1df8b563c4ea851eb9c882c9d7b")) + .sort(desc("community")) + .collectAsList() + .get(0) + .getString(0)); + Assertions.assertEquals( + "euromarine", + resultExplodedProvenance + .select("community") + .where( + resultExplodedProvenance + .col("id") + .equalTo( + "50|od________18::8887b1df8b563c4ea851eb9c882c9d7b")) + .sort(desc("community")) + .collectAsList() + .get(1) + .getString(0)); + + Assertions.assertEquals( + 1, + resultExplodedProvenance + .filter("id = '50|doajarticles::8d817039a63710fcf97e30f14662c6c8'") + .count()); + Assertions.assertEquals( + "mes", + resultExplodedProvenance + .select("community") + .where( + resultExplodedProvenance + .col("id") + .equalTo( + "50|doajarticles::8d817039a63710fcf97e30f14662c6c8")) + .sort(desc("community")) + .collectAsList() + .get(0) + .getString(0)); + + Assertions.assertEquals( + 1, + resultExplodedProvenance + .filter("id = '50|doajarticles::3c98f0632f1875b4979e552ba3aa01e6'") + .count()); + Assertions.assertEquals( + "mes", + resultExplodedProvenance + .select("community") + .where( + resultExplodedProvenance + .col("id") + .equalTo( + "50|doajarticles::3c98f0632f1875b4979e552ba3aa01e6")) + .sort(desc("community")) + .collectAsList() + .get(0) + .getString(0)); + + /* + {"communityList":["euromarine","mes"],"resultId":"50|doajarticles::8d817039a63710fcf97e30f14662c6c8"} "context" ["id": euromarine] updates = 1 + {"communityList":["euromarine","mes"],"resultId":"50|doajarticles::3c98f0632f1875b4979e552ba3aa01e6"} context = [ni, euromarine] updates = 1 + + */ + + query = + "select id, MyT.id community " + + "from dataset " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD "; + + org.apache.spark.sql.Dataset resultCommunityId = spark.sql(query); + + Assertions.assertEquals(10, resultCommunityId.count()); + + Assertions.assertEquals( + 1, + resultCommunityId + .filter("id = '50|dedup_wf_001::afaf128022d29872c4dad402b2db04fe'") + .count()); + Assertions.assertEquals( + "beopen", + resultCommunityId + .select("community") + .where( + resultCommunityId + .col("id") + .equalTo( + "50|dedup_wf_001::afaf128022d29872c4dad402b2db04fe")) + .collectAsList() + .get(0) + .getString(0)); + + Assertions.assertEquals( + 1, + resultCommunityId + .filter("id = '50|dedup_wf_001::3f62cfc27024d564ea86760c494ba93b'") + .count()); + + Assertions.assertEquals( + 3, + resultCommunityId + .filter("id = '50|od________18::8887b1df8b563c4ea851eb9c882c9d7b'") + .count()); + Assertions.assertEquals( + "beopen", + resultCommunityId + .select("community") + .where( + resultCommunityId + .col("id") + .equalTo( + "50|od________18::8887b1df8b563c4ea851eb9c882c9d7b")) + .sort(desc("community")) + .collectAsList() + .get(2) + .getString(0)); + + Assertions.assertEquals( + 2, + resultCommunityId + .filter("id = '50|doajarticles::8d817039a63710fcf97e30f14662c6c8'") + .count()); + Assertions.assertEquals( + "euromarine", + resultCommunityId + .select("community") + .where( + resultCommunityId + .col("id") + .equalTo( + "50|doajarticles::8d817039a63710fcf97e30f14662c6c8")) + .sort(desc("community")) + .collectAsList() + .get(1) + .getString(0)); + + Assertions.assertEquals( + 3, + resultCommunityId + .filter("id = '50|doajarticles::3c98f0632f1875b4979e552ba3aa01e6'") + .count()); + Assertions.assertEquals( + "euromarine", + resultCommunityId + .select("community") + .where( + resultCommunityId + .col("id") + .equalTo( + "50|doajarticles::3c98f0632f1875b4979e552ba3aa01e6")) + .sort(desc("community")) + .collectAsList() + .get(2) + .getString(0)); + Assertions.assertEquals( + "ni", + resultCommunityId + .select("community") + .where( + resultCommunityId + .col("id") + .equalTo( + "50|doajarticles::3c98f0632f1875b4979e552ba3aa01e6")) + .sort(desc("community")) + .collectAsList() + .get(0) + .getString(0)); + } } diff --git a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/ResultToCommunityJobTest.java b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/ResultToCommunityJobTest.java index 9c4d37fb59..f8806d8bb8 100644 --- a/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/ResultToCommunityJobTest.java +++ b/dhp-workflows/dhp-propagation/src/test/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/ResultToCommunityJobTest.java @@ -1,4 +1,280 @@ package eu.dnetlib.dhp.resulttocommunityfromsemrel; +import static org.apache.spark.sql.functions.desc; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.orcidtoresultfromsemrel.OrcidPropagationJobTest; +import eu.dnetlib.dhp.schema.oaf.Dataset; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class ResultToCommunityJobTest { + + private static final Logger log = + LoggerFactory.getLogger( + eu.dnetlib.dhp.resulttocommunityfromsemrel.ResultToCommunityJobTest.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final ClassLoader cl = + eu.dnetlib.dhp.resulttocommunityfromsemrel.ResultToCommunityJobTest.class + .getClassLoader(); + + private static SparkSession spark; + + private static Path workingDir; + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = + Files.createTempDirectory( + eu.dnetlib.dhp.resulttocommunityfromsemrel.ResultToCommunityJobTest.class + .getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName( + eu.dnetlib.dhp.resulttocommunityfromsemrel.ResultToCommunityJobTest.class + .getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = + SparkSession.builder() + .appName(OrcidPropagationJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void test1() throws Exception { + SparkResultToCommunityThroughSemRelJob4.main( + new String[] { + "-isTest", Boolean.TRUE.toString(), + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", + getClass() + .getResource( + "/eu/dnetlib/dhp/resulttocommunityfromsemrel/sample") + .getPath(), + "-hive_metastore_uris", "", + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset", + "-outputPath", workingDir.toString() + "/dataset", + "-preparedInfoPath", + getClass() + .getResource( + "/eu/dnetlib/dhp/resulttocommunityfromsemrel/preparedInfo") + .getPath() + }); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = + sc.textFile(workingDir.toString() + "/dataset") + .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); + + Assertions.assertEquals(10, tmp.count()); + org.apache.spark.sql.Dataset verificationDataset = + spark.createDataset(tmp.rdd(), Encoders.bean(Dataset.class)); + + verificationDataset.createOrReplaceTempView("dataset"); + + String query = + "select id, MyT.id community " + + "from dataset " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD " + + "where MyD.inferenceprovenance = 'propagation'"; + + org.apache.spark.sql.Dataset resultExplodedProvenance = spark.sql(query); + Assertions.assertEquals(5, resultExplodedProvenance.count()); + + Assertions.assertEquals( + 0, + resultExplodedProvenance + .filter("id = '50|dedup_wf_001::2305908abeca9da37eaf3bddcaf81b7b'") + .count()); + + Assertions.assertEquals( + 1, + resultExplodedProvenance + .filter("id = '50|dedup_wf_001::0489ae524201eedaa775da282dce35e7'") + .count()); + Assertions.assertEquals( + "dh-ch", + resultExplodedProvenance + .select("community") + .where( + resultExplodedProvenance + .col("id") + .equalTo( + "50|dedup_wf_001::0489ae524201eedaa775da282dce35e7")) + .collectAsList() + .get(0) + .getString(0)); + + Assertions.assertEquals( + 3, + resultExplodedProvenance + .filter("id = '50|dedup_wf_001::0a60e33b4f0986ebd9819451f2d87a28'") + .count()); + List rowList = + resultExplodedProvenance + .select("community") + .where( + resultExplodedProvenance + .col("id") + .equalTo( + "50|dedup_wf_001::0a60e33b4f0986ebd9819451f2d87a28")) + .sort(desc("community")) + .collectAsList(); + Assertions.assertEquals("mes", rowList.get(0).getString(0)); + Assertions.assertEquals("fam", rowList.get(1).getString(0)); + Assertions.assertEquals("ee", rowList.get(2).getString(0)); + + Assertions.assertEquals( + 1, + resultExplodedProvenance + .filter("id = '50|dedup_wf_001::0ae02edb5598a5545d10b107fcf48dcc'") + .count()); + Assertions.assertEquals( + "aginfra", + resultExplodedProvenance + .select("community") + .where( + resultExplodedProvenance + .col("id") + .equalTo( + "50|dedup_wf_001::0ae02edb5598a5545d10b107fcf48dcc")) + .collectAsList() + .get(0) + .getString(0)); + + query = + "select id, MyT.id community " + + "from dataset " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD "; + + org.apache.spark.sql.Dataset resultCommunityId = spark.sql(query); + + Assertions.assertEquals(10, resultCommunityId.count()); + + Assertions.assertEquals( + 2, + resultCommunityId + .filter("id = '50|dedup_wf_001::0489ae524201eedaa775da282dce35e7'") + .count()); + rowList = + resultCommunityId + .select("community") + .where( + resultCommunityId + .col("id") + .equalTo( + "50|dedup_wf_001::0489ae524201eedaa775da282dce35e7")) + .sort(desc("community")) + .collectAsList(); + Assertions.assertEquals("dh-ch", rowList.get(0).getString(0)); + Assertions.assertEquals("beopen", rowList.get(1).getString(0)); + + Assertions.assertEquals( + 3, + resultCommunityId + .filter("id = '50|dedup_wf_001::0a60e33b4f0986ebd9819451f2d87a28'") + .count()); + rowList = + resultCommunityId + .select("community") + .where( + resultCommunityId + .col("id") + .equalTo( + "50|dedup_wf_001::0a60e33b4f0986ebd9819451f2d87a28")) + .sort(desc("community")) + .collectAsList(); + Assertions.assertEquals("mes", rowList.get(0).getString(0)); + Assertions.assertEquals("fam", rowList.get(1).getString(0)); + Assertions.assertEquals("ee", rowList.get(2).getString(0)); + + Assertions.assertEquals( + 2, + resultCommunityId + .filter("id = '50|dedup_wf_001::0ae02edb5598a5545d10b107fcf48dcc'") + .count()); + rowList = + resultCommunityId + .select("community") + .where( + resultCommunityId + .col("id") + .equalTo( + "50|dedup_wf_001::0ae02edb5598a5545d10b107fcf48dcc")) + .sort(desc("community")) + .collectAsList(); + Assertions.assertEquals("beopen", rowList.get(0).getString(0)); + Assertions.assertEquals("aginfra", rowList.get(1).getString(0)); + + Assertions.assertEquals( + 2, + resultCommunityId + .filter("id = '50|dedup_wf_001::2305908abeca9da37eaf3bddcaf81b7b'") + .count()); + rowList = + resultCommunityId + .select("community") + .where( + resultCommunityId + .col("id") + .equalTo( + "50|dedup_wf_001::2305908abeca9da37eaf3bddcaf81b7b")) + .sort(desc("community")) + .collectAsList(); + Assertions.assertEquals("euromarine", rowList.get(1).getString(0)); + Assertions.assertEquals("ni", rowList.get(0).getString(0)); + + Assertions.assertEquals( + 1, + resultCommunityId + .filter("id = '50|doajarticles::8d817039a63710fcf97e30f14662c6c8'") + .count()); + Assertions.assertEquals( + "euromarine", + resultCommunityId + .select("community") + .where( + resultCommunityId + .col("id") + .equalTo( + "50|doajarticles::8d817039a63710fcf97e30f14662c6c8")) + .collectAsList() + .get(0) + .getString(0)); + } }