From 0467145ae395845ad20d51b533e5701ee61899df Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 15 Jun 2020 11:13:51 +0200 Subject: [PATCH] test for graph dump --- .../dhp/oa/graph/dump/DumpJobTest.java | 920 ++++-------------- .../dump/PrepareResultProjectJobTest.java | 220 +++++ .../dump/QueryInformationSystemTest.java | 112 +++ .../oa/graph/dump/SplitForCommunityTest.java | 224 +++++ .../oa/graph/dump/UpdateProjectInfoTest.java | 134 +++ 5 files changed, 900 insertions(+), 710 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java index 6c9a3c252..73af66f85 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java @@ -1,29 +1,6 @@ package eu.dnetlib.dhp.oa.graph.dump; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.gson.Gson; -import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; -import eu.dnetlib.dhp.schema.oaf.Dataset; -import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; -import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.oaf.Software; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.dom4j.DocumentException; -import org.junit.jupiter.api.*; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -31,13 +8,20 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import static org.mockito.Mockito.lenient; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; -@ExtendWith(MockitoExtension.class) +//@ExtendWith(MockitoExtension.class) public class DumpJobTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -50,41 +34,57 @@ public class DumpJobTest { private static final Logger log = LoggerFactory.getLogger(DumpJobTest.class); - private static HashMap map = new HashMap<>(); + private static HashMap map = new HashMap<>(); - static{ + static { map.put("egi", "EGI Federation"); map.put("fet-fp7", "FET FP7"); map.put("fet-h2020", "FET H2020"); - map.put("clarin", "CLARIN");map.put("fam", "Fisheries and Aquaculture Management");map.put("ni", "Neuroinformatics");map.put("mes", "European Marine Scinece");map.put("instruct", "Instruct-Eric"); - map.put("rda", "Research Data Alliance");map.put("elixir-gr", "ELIXIR GR");map.put("aginfra", "Agricultural and Food Sciences");map.put("dariah", "DARIAH EU");map.put("risis", "RISI"); - map.put("ee", "SDSN - Greece");map.put("oa-pg", "EC Post-Grant Open Access Pilot");map.put("beopen", "Transport Research");map.put("euromarine", "Euromarine");map.put("ifremer", "Ifremer"); - map.put("dh-ch", "Digital Humanities and Cultural Heritage");map.put("science-innovation-policy", "Science and Innovation Policy Studies");map.put("covid-19", "COVID-19");map.put("enrmaps", "Energy Research");map.put("epos", "EPOS"); - + map.put("clarin", "CLARIN"); + map.put("fam", "Fisheries and Aquaculture Management"); + map.put("ni", "Neuroinformatics"); + map.put("mes", "European Marine Scinece"); + map.put("instruct", "Instruct-Eric"); + map.put("rda", "Research Data Alliance"); + map.put("elixir-gr", "ELIXIR GR"); + map.put("aginfra", "Agricultural and Food Sciences"); + map.put("dariah", "DARIAH EU"); + map.put("risis", "RISI"); + map.put("ee", "SDSN - Greece"); + map.put("oa-pg", "EC Post-Grant Open Access Pilot"); + map.put("beopen", "Transport Research"); + map.put("euromarine", "Euromarine"); + map.put("ifremer", "Ifremer"); + map.put("dh-ch", "Digital Humanities and Cultural Heritage"); + map.put("science-innovation-policy", "Science and Innovation Policy Studies"); + map.put("covid-19", "COVID-19"); + map.put("enrmaps", "Energy Research"); + map.put("epos", "EPOS"); } - @Mock - private SparkDumpCommunityProducts dumpCommunityProducts; +// @Mock +// private SparkDumpCommunityProducts dumpCommunityProducts; - private QueryInformationSystem queryInformationSystem; + // private QueryInformationSystem queryInformationSystem; - @Mock - private ISLookUpService isLookUpService; +// @Mock +// private ISLookUpService isLookUpService; - - List communityMap = Arrays.asList("", - "" , - "" , + List communityMap = Arrays + .asList( + "", + "", + "", "", "", "", - "" , + "", "", "", "", "", - "" , + "", "", "", "", @@ -97,13 +97,14 @@ public class DumpJobTest { "", ""); - private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') " + - " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] " + - " return " + - " " + - "{$x//CONFIGURATION/context/@id}" + - "{$x//CONFIGURATION/context/@label}" + - ""; + private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') " + + + " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] " + + " return " + + " " + + "{$x//CONFIGURATION/context/@id}" + + "{$x//CONFIGURATION/context/@label}" + + ""; @BeforeAll public static void beforeAll() throws IOException { @@ -127,12 +128,12 @@ public class DumpJobTest { .getOrCreate(); } - @BeforeEach - public void setUp() throws ISLookUpException { - lenient().when(isLookUpService.quickSearchProfile(XQUERY)).thenReturn(communityMap); - lenient().when(dumpCommunityProducts.getIsLookUpService(MOCK_IS_LOOK_UP_URL)).thenReturn(isLookUpService); - - } +// @BeforeEach +// public void setUp() throws ISLookUpException { +// lenient().when(isLookUpService.quickSearchProfile(XQUERY)).thenReturn(communityMap); +// lenient().when(dumpCommunityProducts.getIsLookUpService(MOCK_IS_LOOK_UP_URL)).thenReturn(isLookUpService); +// +// } @AfterAll public static void afterAll() throws IOException { @@ -141,672 +142,171 @@ public class DumpJobTest { } @Test - public void test1() throws Exception { + public void testDataset() throws Exception { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/dataset.json") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/dataset.json") + .getPath(); - dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset"); + SparkDumpCommunityProducts.main(new String[] { + "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/dataset", + "-sourcePath", sourcePath, + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset", + "-dumpTableName", "eu.dnetlib.dhp.schema.dump.oaf.Dataset", + "-communityMap", new Gson().toJson(map) + }); + +// dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset"); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/dataset") - .map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.dump.oaf.Dataset.class)); + .textFile(workingDir.toString() + "/dataset") + .map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.dump.oaf.Dataset.class)); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.Dataset.class)); + .createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.Dataset.class)); + Assertions.assertEquals(90, verificationDataset.count()); + // verificationDataset.show(false); + + Assertions + .assertTrue( + verificationDataset.filter("bestAccessright.code = 'c_abf2'").count() == verificationDataset + .filter("bestAccessright.code = 'c_abf2' and bestAccessright.label = 'OPEN'") + .count()); + + Assertions + .assertTrue( + verificationDataset.filter("bestAccessright.code = 'c_16ec'").count() == verificationDataset + .filter("bestAccessright.code = 'c_16ec' and bestAccessright.label = 'RESTRICTED'") + .count()); + + Assertions + .assertTrue( + verificationDataset.filter("bestAccessright.code = 'c_14cb'").count() == verificationDataset + .filter("bestAccessright.code = 'c_14cb' and bestAccessright.label = 'CLOSED'") + .count()); + + Assertions + .assertTrue( + verificationDataset.filter("bestAccessright.code = 'c_f1cf'").count() == verificationDataset + .filter("bestAccessright.code = 'c_f1cf' and bestAccessright.label = 'EMBARGO'") + .count()); + + Assertions.assertTrue(verificationDataset.filter("size(context) > 0").count() == 90); + + verificationDataset.select("instance.type").show(false); + +//TODO verify value and name of the fields for vocab related value (i.e. accessright, bestaccessright) + + } + + @Test + public void testPublication() throws Exception { + + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/publication.json") + .getPath(); + + SparkDumpCommunityProducts.main(new String[] { + "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/publication", + "-sourcePath", sourcePath, + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication", + "-dumpTableName", "eu.dnetlib.dhp.schema.dump.oaf.Publication", + "-communityMap", new Gson().toJson(map) + }); + +// dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset"); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/publication") + .map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.dump.oaf.Publication.class)); + + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.Publication.class)); + + Assertions.assertEquals(76, verificationDataset.count()); verificationDataset.show(false); +//TODO verify value and name of the fields for vocab related value (i.e. accessright, bestaccessright) } + @Test - public void test0() throws ISLookUpException { - System.out.println(new Gson().toJson(queryInformationSystem.getCommunityMap())); + public void testSoftware() throws Exception { + + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/software.json") + .getPath(); + + SparkDumpCommunityProducts.main(new String[] { + "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/software", + "-sourcePath", sourcePath, + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software", + "-dumpTableName", "eu.dnetlib.dhp.schema.dump.oaf.Software", + "-communityMap", new Gson().toJson(map) + }); + +// dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset"); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/software") + .map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.dump.oaf.Software.class)); + + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.Software.class)); + + Assertions.assertEquals(6, verificationDataset.count()); + verificationDataset.show(false); + +//TODO verify value and name of the fields for vocab related value (i.e. accessright, bestaccessright) + + } + + @Test + public void testORP() throws Exception { + + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/orp.json") + .getPath(); + + SparkDumpCommunityProducts.main(new String[] { + "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/orp", + "-sourcePath", sourcePath, + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct", + "-dumpTableName", "eu.dnetlib.dhp.schema.dump.oaf.OtherResearchProduct", + "-communityMap", new Gson().toJson(map) + }); + +// dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset"); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/orp") + .map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.dump.oaf.OtherResearchProduct.class)); + + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.OtherResearchProduct.class)); + + Assertions.assertEquals(3, verificationDataset.count()); + verificationDataset.show(false); + +//TODO verify value and name of the fields for vocab related value (i.e. accessright, bestaccessright) + } -// @Test -// public void bulktagBySubjectNoPreviousContextTest() throws Exception { -// final String sourcePath = getClass() -// .getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/nocontext") -// .getPath(); -// final String pathMap = DumpJobTest.pathMap; -// DumpJobTest -// .main( -// new String[] { -// "-isTest", Boolean.TRUE.toString(), -// "-isSparkSessionManaged", Boolean.FALSE.toString(), -// "-sourcePath", sourcePath, -// "-taggingConf", taggingConf, -// "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset", -// "-outputPath", workingDir.toString() + "/dataset", -// "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, -// "-pathMap", pathMap -// }); -// -// final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); -// -// JavaRDD tmp = sc -// .textFile(workingDir.toString() + "/dataset") -// .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); -// -// Assertions.assertEquals(10, tmp.count()); -// org.apache.spark.sql.Dataset verificationDataset = spark -// .createDataset(tmp.rdd(), Encoders.bean(Dataset.class)); -// -// verificationDataset.createOrReplaceTempView("dataset"); -// -// String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name " -// + "from dataset " -// + "lateral view explode(context) c as MyT " -// + "lateral view explode(MyT.datainfo) d as MyD " -// + "where MyD.inferenceprovenance = 'bulktagging'"; -// -// Assertions.assertEquals(5, spark.sql(query).count()); -// -// org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query); -// Assertions -// .assertEquals( -// 5, idExplodeCommunity.filter("provenance = 'community:subject'").count()); -// Assertions -// .assertEquals( -// 5, -// idExplodeCommunity.filter("name = 'Bulktagging for Community - Subject'").count()); -// -// Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'covid-19'").count()); -// Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'mes'").count()); -// Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'fam'").count()); -// Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'aginfra'").count()); -// -// Assertions -// .assertEquals( -// 1, -// idExplodeCommunity -// .filter("id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529'") -// .count()); -// Assertions -// .assertEquals( -// 1, -// idExplodeCommunity -// .filter( -// "community = 'covid-19' and id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529'") -// .count()); -// -// Assertions -// .assertEquals( -// 2, -// idExplodeCommunity -// .filter("id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b'") -// .count()); -// Assertions -// .assertEquals( -// 2, -// idExplodeCommunity -// .filter( -// "(community = 'covid-19' or community = 'aginfra') and id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b'") -// .count()); -// -// Assertions -// .assertEquals( -// 2, -// idExplodeCommunity -// .filter("id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62'") -// .count()); -// Assertions -// .assertEquals( -// 2, -// idExplodeCommunity -// .filter( -// "(community = 'mes' or community = 'fam') and id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62'") -// .count()); -// } -// -// @Test -// public void bulktagBySubjectPreviousContextNoProvenanceTest() throws Exception { -// final String sourcePath = getClass() -// .getResource( -// "/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance") -// .getPath(); -// final String pathMap = BulkTagJobTest.pathMap; -// SparkBulkTagJob -// .main( -// new String[] { -// "-isTest", Boolean.TRUE.toString(), -// "-isSparkSessionManaged", Boolean.FALSE.toString(), -// "-sourcePath", sourcePath, -// "-taggingConf", taggingConf, -// "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset", -// "-outputPath", workingDir.toString() + "/dataset", -// "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, -// "-pathMap", pathMap -// }); -// -// final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); -// -// JavaRDD tmp = sc -// .textFile(workingDir.toString() + "/dataset") -// .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); -// -// Assertions.assertEquals(10, tmp.count()); -// org.apache.spark.sql.Dataset verificationDataset = spark -// .createDataset(tmp.rdd(), Encoders.bean(Dataset.class)); -// -// verificationDataset.createOrReplaceTempView("dataset"); -// -// String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance " -// + "from dataset " -// + "lateral view explode(context) c as MyT " -// + "lateral view explode(MyT.datainfo) d as MyD " -// + "where MyT.id = 'covid-19' "; -// -// Assertions.assertEquals(3, spark.sql(query).count()); -// -// org.apache.spark.sql.Dataset communityContext = spark.sql(query); -// -// Assertions -// .assertEquals( -// 2, -// communityContext -// .filter("id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529'") -// .count()); -// Assertions -// .assertEquals( -// 1, -// communityContext -// .filter( -// "id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and provenance = 'community:subject'") -// .count()); -// Assertions -// .assertEquals( -// 1, -// communityContext -// .filter( -// "id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and provenance = 'propagation:community:productsthroughsemrel'") -// .count()); -// -// query = "select id, MyT.id community, size(MyT.datainfo) datainfosize " -// + "from dataset " -// + "lateral view explode (context) as MyT " -// + "where size(MyT.datainfo) > 0"; -// -// Assertions -// .assertEquals( -// 2, -// spark -// .sql(query) -// .select("datainfosize") -// .where( -// "id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' a" -// + "nd community = 'covid-19'") -// .collectAsList() -// .get(0) -// .getInt(0)); -// } -// -// @Test -// public void bulktagByDatasourceTest() throws Exception { -// final String sourcePath = getClass() -// .getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource") -// .getPath(); -// SparkBulkTagJob -// .main( -// new String[] { -// "-isTest", Boolean.TRUE.toString(), -// "-isSparkSessionManaged", Boolean.FALSE.toString(), -// "-sourcePath", sourcePath, -// "-taggingConf", taggingConf, -// "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication", -// "-outputPath", workingDir.toString() + "/publication", -// "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, -// "-pathMap", pathMap -// }); -// -// final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); -// -// JavaRDD tmp = sc -// .textFile(workingDir.toString() + "/publication") -// .map(item -> OBJECT_MAPPER.readValue(item, Publication.class)); -// -// Assertions.assertEquals(10, tmp.count()); -// org.apache.spark.sql.Dataset verificationDataset = spark -// .createDataset(tmp.rdd(), Encoders.bean(Publication.class)); -// -// verificationDataset.createOrReplaceTempView("publication"); -// -// String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name " -// + "from publication " -// + "lateral view explode(context) c as MyT " -// + "lateral view explode(MyT.datainfo) d as MyD " -// + "where MyD.inferenceprovenance = 'bulktagging'"; -// -// org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query); -// -// Assertions.assertEquals(5, idExplodeCommunity.count()); -// Assertions -// .assertEquals( -// 5, idExplodeCommunity.filter("provenance = 'community:datasource'").count()); -// Assertions -// .assertEquals( -// 5, -// idExplodeCommunity -// .filter("name = 'Bulktagging for Community - Datasource'") -// .count()); -// -// Assertions.assertEquals(3, idExplodeCommunity.filter("community = 'fam'").count()); -// Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'aginfra'").count()); -// -// Assertions -// .assertEquals( -// 3, -// idExplodeCommunity -// .filter( -// "community = 'fam' and (id = '50|ec_fp7health::000085c89f4b96dc2269bd37edb35306' " -// + "or id = '50|ec_fp7health::000b9e61f83f5a4b0c35777b7bccdf38' " -// + "or id = '50|ec_fp7health::0010eb63e181e3e91b8b6dc6b3e1c798')") -// .count()); -// -// Assertions -// .assertEquals( -// 2, -// idExplodeCommunity -// .filter( -// "community = 'aginfra' and (id = '50|ec_fp7health::000c8195edd542e4e64ebb32172cbf89' " -// + "or id = '50|ec_fp7health::0010eb63e181e3e91b8b6dc6b3e1c798')") -// .count()); -// } -// -// @Test -// public void bulktagByZenodoCommunityTest() throws Exception { -// final String sourcePath = getClass() -// .getResource( -// "/eu/dnetlib/dhp/bulktag/sample/otherresearchproduct/update_zenodocommunity") -// .getPath(); -// SparkBulkTagJob -// .main( -// new String[] { -// "-isTest", Boolean.TRUE.toString(), -// "-isSparkSessionManaged", Boolean.FALSE.toString(), -// "-sourcePath", sourcePath, -// "-taggingConf", taggingConf, -// "-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct", -// "-outputPath", workingDir.toString() + "/orp", -// "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, -// "-pathMap", pathMap -// }); -// -// final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); -// -// JavaRDD tmp = sc -// .textFile(workingDir.toString() + "/orp") -// .map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class)); -// -// Assertions.assertEquals(10, tmp.count()); -// org.apache.spark.sql.Dataset verificationDataset = spark -// .createDataset(tmp.rdd(), Encoders.bean(OtherResearchProduct.class)); -// -// verificationDataset.createOrReplaceTempView("orp"); -// -// String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name " -// + "from orp " -// + "lateral view explode(context) c as MyT " -// + "lateral view explode(MyT.datainfo) d as MyD " -// + "where MyD.inferenceprovenance = 'bulktagging'"; -// -// org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query); -// Assertions.assertEquals(8, idExplodeCommunity.count()); -// -// Assertions -// .assertEquals( -// 8, idExplodeCommunity.filter("provenance = 'community:zenodocommunity'").count()); -// Assertions -// .assertEquals( -// 8, -// idExplodeCommunity.filter("name = 'Bulktagging for Community - Zenodo'").count()); -// -// Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'covid-19'").count()); -// Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'aginfra'").count()); -// Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'beopen'").count()); -// Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'fam'").count()); -// Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'mes'").count()); -// -// Assertions -// .assertEquals( -// 1, -// idExplodeCommunity -// .filter( -// "id = '50|od______2017::0750a4d0782265873d669520f5e33c07' " -// + "and community = 'covid-19'") -// .count()); -// Assertions -// .assertEquals( -// 3, -// idExplodeCommunity -// .filter( -// "id = '50|od______2017::1bd97baef19dbd2db3203b112bb83bc5' and " -// + "(community = 'aginfra' or community = 'mes' or community = 'fam')") -// .count()); -// Assertions -// .assertEquals( -// 1, -// idExplodeCommunity -// .filter( -// "id = '50|od______2017::1e400f1747487fd15998735c41a55c72' " -// + "and community = 'beopen'") -// .count()); -// Assertions -// .assertEquals( -// 3, -// idExplodeCommunity -// .filter( -// "id = '50|od______2017::210281c5bc1c739a11ccceeeca806396' and " -// + "(community = 'beopen' or community = 'fam' or community = 'mes')") -// .count()); -// -// query = "select id, MyT.id community, size(MyT.datainfo) datainfosize " -// + "from orp " -// + "lateral view explode (context) as MyT " -// + "where size(MyT.datainfo) > 0"; -// -// Assertions -// .assertEquals( -// 2, -// spark -// .sql(query) -// .select("datainfosize") -// .where( -// "id = '50|od______2017::210281c5bc1c739a11ccceeeca806396' a" -// + "nd community = 'beopen'") -// .collectAsList() -// .get(0) -// .getInt(0)); -// -// // verify the zenodo community context is not present anymore in the records -// query = "select id, MyT.id community " -// + "from orp " -// + "lateral view explode(context) c as MyT " -// + "lateral view explode(MyT.datainfo) d as MyD "; -// -// org.apache.spark.sql.Dataset tmp2 = spark.sql(query); -// -// Assertions -// .assertEquals( -// 0, -// tmp2 -// .select("community") -// .where(tmp2.col("community").contains(ZENODO_COMMUNITY_INDICATOR)) -// .count()); -// } -// -// @Test -// public void bulktagBySubjectDatasourceTest() throws Exception { -// final String sourcePath = getClass() -// .getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject_datasource") -// .getPath(); -// SparkBulkTagJob -// .main( -// new String[] { -// "-isTest", Boolean.TRUE.toString(), -// "-isSparkSessionManaged", Boolean.FALSE.toString(), -// "-sourcePath", sourcePath, -// "-taggingConf", taggingConf, -// "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset", -// "-outputPath", workingDir.toString() + "/dataset", -// "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, -// "-pathMap", pathMap -// }); -// -// final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); -// -// JavaRDD tmp = sc -// .textFile(workingDir.toString() + "/dataset") -// .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); -// -// Assertions.assertEquals(10, tmp.count()); -// org.apache.spark.sql.Dataset verificationDataset = spark -// .createDataset(tmp.rdd(), Encoders.bean(Dataset.class)); -// -// verificationDataset.createOrReplaceTempView("dataset"); -// -// String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name " -// + "from dataset " -// + "lateral view explode(context) c as MyT " -// + "lateral view explode(MyT.datainfo) d as MyD " -// + "where MyD.inferenceprovenance = 'bulktagging'"; -// -// org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query); -// Assertions.assertEquals(7, idExplodeCommunity.count()); -// -// Assertions -// .assertEquals( -// 5, idExplodeCommunity.filter("provenance = 'community:subject'").count()); -// Assertions -// .assertEquals( -// 2, idExplodeCommunity.filter("provenance = 'community:datasource'").count()); -// Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'covid-19'").count()); -// Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'fam'").count()); -// Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'aginfra'").count()); -// Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'mes'").count()); -// -// query = "select id, MyT.id community, size(MyT.datainfo) datainfosize " -// + "from dataset " -// + "lateral view explode (context) as MyT " -// + "where size(MyT.datainfo) > 0"; -// -// org.apache.spark.sql.Dataset tmp2 = spark.sql(query); -// -// Assertions -// .assertEquals( -// 2, -// tmp2 -// .select("datainfosize") -// .where( -// "id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b' and " -// + "community = 'aginfra'") -// .collectAsList() -// .get(0) -// .getInt(0)); -// -// Assertions -// .assertEquals( -// 1, -// tmp2 -// .select("datainfosize") -// .where( -// "id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b' and " -// + "community = 'covid-19'") -// .collectAsList() -// .get(0) -// .getInt(0)); -// -// Assertions -// .assertEquals( -// 2, -// tmp2 -// .select("datainfosize") -// .where( -// "id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and " -// + "community = 'fam'") -// .collectAsList() -// .get(0) -// .getInt(0)); -// Assertions -// .assertEquals( -// 2, -// tmp2 -// .select("datainfosize") -// .where( -// "id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and " -// + "community = 'covid-19'") -// .collectAsList() -// .get(0) -// .getInt(0)); -// -// Assertions -// .assertEquals( -// 1, -// tmp2 -// .select("datainfosize") -// .where( -// "id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62' and " -// + "community = 'fam'") -// .collectAsList() -// .get(0) -// .getInt(0)); -// Assertions -// .assertEquals( -// 1, -// tmp2 -// .select("datainfosize") -// .where( -// "id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62' and " -// + "community = 'mes'") -// .collectAsList() -// .get(0) -// .getInt(0)); -// } -// -// @Test -// public void bulktagBySubjectDatasourceZenodoCommunityTest() throws Exception { -// -// SparkBulkTagJob -// .main( -// new String[] { -// "-isTest", Boolean.TRUE.toString(), -// "-isSparkSessionManaged", Boolean.FALSE.toString(), -// "-sourcePath", getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/software/").getPath(), -// "-taggingConf", taggingConf, -// "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software", -// "-outputPath", workingDir.toString() + "/software", -// "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, -// "-pathMap", pathMap -// }); -// -// final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); -// -// JavaRDD tmp = sc -// .textFile(workingDir.toString() + "/software") -// .map(item -> OBJECT_MAPPER.readValue(item, Software.class)); -// -// Assertions.assertEquals(10, tmp.count()); -// org.apache.spark.sql.Dataset verificationDataset = spark -// .createDataset(tmp.rdd(), Encoders.bean(Software.class)); -// -// verificationDataset.createOrReplaceTempView("software"); -// -// String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name " -// + "from software " -// + "lateral view explode(context) c as MyT " -// + "lateral view explode(MyT.datainfo) d as MyD " -// + "where MyD.inferenceprovenance = 'bulktagging'"; -// -// org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query); -// Assertions.assertEquals(10, idExplodeCommunity.count()); -// -// idExplodeCommunity.show(false); -// Assertions -// .assertEquals( -// 3, idExplodeCommunity.filter("provenance = 'community:subject'").count()); -// Assertions -// .assertEquals( -// 3, idExplodeCommunity.filter("provenance = 'community:datasource'").count()); -// Assertions -// .assertEquals( -// 4, idExplodeCommunity.filter("provenance = 'community:zenodocommunity'").count()); -// -// Assertions.assertEquals(3, idExplodeCommunity.filter("community = 'covid-19'").count()); -// Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'dh-ch'").count()); -// Assertions.assertEquals(4, idExplodeCommunity.filter("community = 'aginfra'").count()); -// Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'dariah'").count()); -// Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'fam'").count()); -// -// Assertions -// .assertEquals( -// 2, -// idExplodeCommunity -// .filter( -// "provenance = 'community:zenodocommunity' and " -// + "id = '50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4' and (" -// + "community = 'dh-ch' or community = 'dariah')") -// .count()); -// -// query = "select id, MyT.id community, size(MyT.datainfo) datainfosize " -// + "from software " -// + "lateral view explode (context) as MyT " -// + "where size(MyT.datainfo) > 0"; -// -// org.apache.spark.sql.Dataset tmp2 = spark.sql(query); -// -// Assertions -// .assertEquals( -// 2, -// tmp2 -// .select("datainfosize") -// .where( -// "id = '50|od______1582::501b25d420f808c8eddcd9b16e917f11' and " -// + "community = 'covid-19'") -// .collectAsList() -// .get(0) -// .getInt(0)); -// -// Assertions -// .assertEquals( -// 3, -// tmp2 -// .select("datainfosize") -// .where( -// "id = '50|od______1582::581621232a561b7e8b4952b18b8b0e56' and " -// + "community = 'aginfra'") -// .collectAsList() -// .get(0) -// .getInt(0)); -// } -// -// @Test -// public void bulktagDatasourcewithConstraintsTest() throws Exception { -// -// final String sourcePath = getClass() -// .getResource( -// "/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints") -// .getPath(); -// SparkBulkTagJob -// .main( -// new String[] { -// "-isTest", Boolean.TRUE.toString(), -// "-isSparkSessionManaged", Boolean.FALSE.toString(), -// "-sourcePath", sourcePath, -// "-taggingConf", taggingConf, -// "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset", -// "-outputPath", workingDir.toString() + "/dataset", -// "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, -// "-pathMap", pathMap -// }); -// -// final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); -// -// JavaRDD tmp = sc -// .textFile(workingDir.toString() + "/dataset") -// .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); -// -// Assertions.assertEquals(10, tmp.count()); -// org.apache.spark.sql.Dataset verificationDataset = spark -// .createDataset(tmp.rdd(), Encoders.bean(Dataset.class)); -// -// verificationDataset.createOrReplaceTempView("dataset"); -// String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name " -// + "from dataset " -// + "lateral view explode(context) c as MyT " -// + "lateral view explode(MyT.datainfo) d as MyD " -// + "where MyD.inferenceprovenance = 'bulktagging'"; -// -// org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query); -// -// idExplodeCommunity.show(false); -// Assertions.assertEquals(3, idExplodeCommunity.count()); -// -// Assertions -// .assertEquals( -// 3, idExplodeCommunity.filter("provenance = 'community:datasource'").count()); -// } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/PrepareResultProjectJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/PrepareResultProjectJobTest.java index b50f9414f..ef54933b5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/PrepareResultProjectJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/PrepareResultProjectJobTest.java @@ -1,4 +1,224 @@ + package eu.dnetlib.dhp.oa.graph.dump; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; + +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + public class PrepareResultProjectJobTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + + private static final Logger log = LoggerFactory + .getLogger(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class); + + private static HashMap map = new HashMap<>(); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(eu.dnetlib.dhp.oa.graph.dump.PrepareResultProjectJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void testNoMatch() throws Exception { + + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/no_match") + .getPath(); + + SparkPrepareResultProject.main(new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/preparedInfo", + "-sourcePath", sourcePath + }); + +// dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset"); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/preparedInfo") + .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); + + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); + + Assertions.assertEquals(0, verificationDataset.count()); + + } + + @Test + public void testMatchOne() throws Exception { + + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match_one") + .getPath(); + + SparkPrepareResultProject.main(new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/preparedInfo", + "-sourcePath", sourcePath + }); + +// dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset"); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/preparedInfo") + .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); + + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); + + Assertions.assertTrue(verificationDataset.count() == 1); + + Assertions + .assertEquals( + 1, + verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count()); + + } + + @Test + public void testMatch() throws Exception { + + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultProject/match") + .getPath(); + + SparkPrepareResultProject.main(new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/preparedInfo", + "-sourcePath", sourcePath + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/preparedInfo") + .map(item -> OBJECT_MAPPER.readValue(item, ResultProject.class)); + + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(ResultProject.class)); + + Assertions.assertTrue(verificationDataset.count() == 2); + + Assertions + .assertEquals( + 1, + verificationDataset.filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count()); + Assertions + .assertEquals( + 1, + verificationDataset.filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'").count()); + + verificationDataset.createOrReplaceTempView("dataset"); + + String query = "select resultId, MyT.id project , MyT.title title, MyT.acronym acronym " + + "from dataset " + + "lateral view explode(projectsList) p as MyT "; + + org.apache.spark.sql.Dataset resultExplodedProvenance = spark.sql(query); + Assertions.assertEquals(3, resultExplodedProvenance.count()); + Assertions + .assertEquals( + 2, + resultExplodedProvenance + .filter("resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") + .count()); + + Assertions + .assertEquals( + 1, + resultExplodedProvenance + .filter("resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'") + .count()); + + Assertions + .assertEquals( + 2, + resultExplodedProvenance + .filter("project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6'") + .count()); + + Assertions + .assertEquals( + 1, + resultExplodedProvenance + .filter( + "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") + .count()); + + Assertions + .assertEquals( + 1, + resultExplodedProvenance + .filter( + "project = '40|aka_________::0f7d119de1f656b5763a16acf876fed6' and resultId = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'") + .count()); + + Assertions + .assertEquals( + 1, + resultExplodedProvenance + .filter("project = '40|aka_________::03376222b28a3aebf2730ac514818d04'") + .count()); + + Assertions + .assertEquals( + 1, + resultExplodedProvenance + .filter( + "project = '40|aka_________::03376222b28a3aebf2730ac514818d04' and resultId = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'") + .count()); + + resultExplodedProvenance.show(false); + } + } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystemTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystemTest.java index fb3269eed..222bff11d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystemTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/QueryInformationSystemTest.java @@ -1,4 +1,116 @@ + package eu.dnetlib.dhp.oa.graph.dump; +import static org.mockito.Mockito.lenient; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.neethi.Assertion; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.google.gson.Gson; + +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; + +@ExtendWith(MockitoExtension.class) public class QueryInformationSystemTest { + + private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') " + + + " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] " + + " return " + + " " + + "{$x//CONFIGURATION/context/@id}" + + "{$x//CONFIGURATION/context/@label}" + + ""; + + List communityMap = Arrays + .asList( + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + ""); + + @Mock + private ISLookUpService isLookUpService; + + private QueryInformationSystem queryInformationSystem; + + private Map map; + + @BeforeEach + public void setUp() throws ISLookUpException { + lenient().when(isLookUpService.quickSearchProfile(XQUERY)).thenReturn(communityMap); + queryInformationSystem = new QueryInformationSystem(); + queryInformationSystem.setIsLookUp(isLookUpService); + map = queryInformationSystem.getCommunityMap(); + } + + @Test + public void testSize() throws ISLookUpException { + + Assertions.assertEquals(23, map.size()); + } + + @Test + public void testContent() { + Assertions.assertTrue(map.containsKey("egi") && map.get("egi").equals("EGI Federation")); + + Assertions.assertTrue(map.containsKey("fet-fp7") && map.get("fet-fp7").equals("FET FP7")); + Assertions.assertTrue(map.containsKey("fet-h2020") && map.get("fet-h2020").equals("FET H2020")); + Assertions.assertTrue(map.containsKey("clarin") && map.get("clarin").equals("CLARIN")); + Assertions.assertTrue(map.containsKey("rda") && map.get("rda").equals("Research Data Alliance")); + Assertions.assertTrue(map.containsKey("ee") && map.get("ee").equals("SDSN - Greece")); + Assertions + .assertTrue( + map.containsKey("dh-ch") && map.get("dh-ch").equals("Digital Humanities and Cultural Heritage")); + Assertions.assertTrue(map.containsKey("fam") && map.get("fam").equals("Fisheries and Aquaculture Management")); + Assertions.assertTrue(map.containsKey("ni") && map.get("ni").equals("Neuroinformatics")); + Assertions.assertTrue(map.containsKey("mes") && map.get("mes").equals("European Marine Science")); + Assertions.assertTrue(map.containsKey("instruct") && map.get("instruct").equals("Instruct-ERIC")); + Assertions.assertTrue(map.containsKey("elixir-gr") && map.get("elixir-gr").equals("ELIXIR GR")); + Assertions + .assertTrue(map.containsKey("aginfra") && map.get("aginfra").equals("Agricultural and Food Sciences")); + Assertions.assertTrue(map.containsKey("dariah") && map.get("dariah").equals("DARIAH EU")); + Assertions.assertTrue(map.containsKey("risis") && map.get("risis").equals("RISIS")); + Assertions.assertTrue(map.containsKey("epos") && map.get("epos").equals("EPOS")); + Assertions.assertTrue(map.containsKey("beopen") && map.get("beopen").equals("Transport Research")); + Assertions.assertTrue(map.containsKey("euromarine") && map.get("euromarine").equals("EuroMarine")); + Assertions.assertTrue(map.containsKey("ifremer") && map.get("ifremer").equals("Ifremer")); + Assertions.assertTrue(map.containsKey("oa-pg") && map.get("oa-pg").equals("EC Post-Grant Open Access Pilot")); + Assertions + .assertTrue( + map.containsKey("science-innovation-policy") + && map.get("science-innovation-policy").equals("Science and Innovation Policy Studies")); + Assertions.assertTrue(map.containsKey("covid-19") && map.get("covid-19").equals("COVID-19")); + Assertions.assertTrue(map.containsKey("enermaps") && map.get("enermaps").equals("Energy Research")); + } + } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/SplitForCommunityTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/SplitForCommunityTest.java index bd86ad083..4d4608889 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/SplitForCommunityTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/SplitForCommunityTest.java @@ -1,4 +1,228 @@ + package eu.dnetlib.dhp.oa.graph.dump; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; + +import eu.dnetlib.dhp.schema.dump.oaf.Software; + public class SplitForCommunityTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + + private static String MOCK_IS_LOOK_UP_URL = "BASEURL:8280/is/services/isLookUp"; + + private static final Logger log = LoggerFactory.getLogger(DumpJobTest.class); + + private static HashMap map = new HashMap<>(); + + static { + map.put("egi", "EGI Federation"); + map.put("fet-fp7", "FET FP7"); + map.put("fet-h2020", "FET H2020"); + map.put("clarin", "CLARIN"); + map.put("fam", "Fisheries and Aquaculture Management"); + map.put("ni", "Neuroinformatics"); + map.put("mes", "European Marine Scinece"); + map.put("instruct", "Instruct-Eric"); + map.put("rda", "Research Data Alliance"); + map.put("elixir-gr", "ELIXIR GR"); + map.put("aginfra", "Agricultural and Food Sciences"); + map.put("dariah", "DARIAH EU"); + map.put("risis", "RISI"); + map.put("ee", "SDSN - Greece"); + map.put("oa-pg", "EC Post-Grant Open Access Pilot"); + map.put("beopen", "Transport Research"); + map.put("euromarine", "Euromarine"); + map.put("ifremer", "Ifremer"); + map.put("dh-ch", "Digital Humanities and Cultural Heritage"); + map.put("science-innovation-policy", "Science and Innovation Policy Studies"); + map.put("covid-19", "COVID-19"); + map.put("enrmaps", "Energy Research"); + map.put("epos", "EPOS"); + + } + +// @Mock +// private SparkDumpCommunityProducts dumpCommunityProducts; + + // private QueryInformationSystem queryInformationSystem; + +// @Mock +// private ISLookUpService isLookUpService; + + List communityMap = Arrays + .asList( + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + ""); + + private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') " + + + " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] " + + " return " + + " " + + "{$x//CONFIGURATION/context/@id}" + + "{$x//CONFIGURATION/context/@label}" + + ""; + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(SplitForCommunityTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(SplitForCommunityTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(SplitForCommunityTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + +// @BeforeEach +// public void setUp() throws ISLookUpException { +// lenient().when(isLookUpService.quickSearchProfile(XQUERY)).thenReturn(communityMap); +// lenient().when(dumpCommunityProducts.getIsLookUpService(MOCK_IS_LOOK_UP_URL)).thenReturn(isLookUpService); +// +// } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void test1() throws Exception { + + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/splitForCommunity/software") + .getPath(); + + SparkSplitForCommunity.main(new String[] { + "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/split", + "-sourcePath", sourcePath, + "-resultTableName", "eu.dnetlib.dhp.schema.dump.oaf.Software", + "-communityMap", new Gson().toJson(map) + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/split/dh-ch") + .map(item -> OBJECT_MAPPER.readValue(item, Software.class)); + + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(Software.class)); + + Assertions.assertEquals(1, verificationDataset.count()); + + Assertions + .assertEquals( + 1, verificationDataset.filter("id = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'").count()); + + tmp = sc + .textFile(workingDir.toString() + "/split/egi") + .map(item -> OBJECT_MAPPER.readValue(item, Software.class)); + + verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(Software.class)); + + Assertions.assertEquals(1, verificationDataset.count()); + + Assertions + .assertEquals( + 1, verificationDataset.filter("id = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count()); + + tmp = sc + .textFile(workingDir.toString() + "/split/ni") + .map(item -> OBJECT_MAPPER.readValue(item, Software.class)); + + verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(Software.class)); + + Assertions.assertEquals(1, verificationDataset.count()); + + Assertions + .assertEquals( + 1, verificationDataset.filter("id = '50|datacite____::6b1e3a2fa60ed8c27317a66d6357f795'").count()); + + tmp = sc + .textFile(workingDir.toString() + "/split/science-innovation-policy") + .map(item -> OBJECT_MAPPER.readValue(item, Software.class)); + + verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(Software.class)); + + Assertions.assertEquals(4, verificationDataset.count()); + + Assertions + .assertEquals( + 1, verificationDataset.filter("id = '50|dedup_wf_001::0347b1cd516fc59e41ba92e0d74e4e9f'").count()); + Assertions + .assertEquals( + 1, verificationDataset.filter("id = '50|dedup_wf_001::1432beb6171baa5da8a85a7f99545d69'").count()); + Assertions + .assertEquals( + 1, verificationDataset.filter("id = '50|dedup_wf_001::1c8bd19e633976e314b88ce5c3f92d69'").count()); + Assertions + .assertEquals( + 1, verificationDataset.filter("id = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80'").count()); + + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/UpdateProjectInfoTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/UpdateProjectInfoTest.java index cab431bb3..a7599c09f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/UpdateProjectInfoTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/UpdateProjectInfoTest.java @@ -1,4 +1,138 @@ + package eu.dnetlib.dhp.oa.graph.dump; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; + +import org.apache.commons.io.FileUtils; +import org.apache.neethi.Assertion; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.schema.dump.oaf.Software; + public class UpdateProjectInfoTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + + private static final Logger log = LoggerFactory.getLogger(eu.dnetlib.dhp.oa.graph.dump.UpdateProjectInfoTest.class); + + private static HashMap map = new HashMap<>(); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory(eu.dnetlib.dhp.oa.graph.dump.UpdateProjectInfoTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(eu.dnetlib.dhp.oa.graph.dump.UpdateProjectInfoTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(eu.dnetlib.dhp.oa.graph.dump.UpdateProjectInfoTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void test1() throws Exception { + + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/dump/addProjectInfo") + .getPath(); + + SparkUpdateProjectInfo.main(new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-resultTableName", "eu.dnetlib.dhp.schema.dump.oaf.Software", + "-preparedInfoPath", sourcePath + "/preparedInfo", + "-outputPath", workingDir.toString() + "/ext/software", + "-sourcePath", sourcePath + "/software" + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/ext/software") + .map(item -> OBJECT_MAPPER.readValue(item, Software.class)); + + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(Software.class)); + + verificationDataset.show(false); + + Assertions.assertEquals(6, verificationDataset.count()); + verificationDataset.createOrReplaceTempView("dataset"); + + String query = "select id, MyT.code code, MyT.title title, MyT.funder.name funderName, MyT.funder.shortName funderShortName, " + + + "MyT.funder.jurisdiction funderJurisdiction, MyT.funder.fundingStream fundingStream " + + "from dataset " + + "lateral view explode(projects) p as MyT "; + + org.apache.spark.sql.Dataset resultExplodedProvenance = spark.sql(query); + + Assertions.assertEquals(3, resultExplodedProvenance.count()); + resultExplodedProvenance.show(false); + + Assertions + .assertEquals( + 2, + resultExplodedProvenance.filter("id = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb'").count()); + + Assertions + .assertEquals( + 1, + resultExplodedProvenance + .filter("id = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb' and code = '123455'") + .count()); + + Assertions + .assertEquals( + 1, + resultExplodedProvenance + .filter("id = '50|dedup_wf_001::e4805d005bfab0cd39a1642cbf477fdb' and code = '119027'") + .count()); + + Assertions + .assertEquals( + 1, + resultExplodedProvenance + .filter("id = '50|dedup_wf_001::51b88f272ba9c3bb181af64e70255a80' and code = '123455'") + .count()); + + resultExplodedProvenance.show(false); + } + }