unit test for the four steps

This commit is contained in:
Miriam Baglioni 2020-07-16 17:27:13 +02:00
parent 0352c99848
commit 72336a6525
4 changed files with 777 additions and 0 deletions

View File

@ -1,4 +1,140 @@
package eu.dnetlib.dhp.actionmanager.remapping;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.SparkAtomicActionJob;
import org.apache.commons.io.FileUtils;
import org.apache.neethi.Assertion;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
public class ExpandResultInfoTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ClassLoader cl = PrepareInfoTest.class
.getClassLoader();
private static SparkSession spark;
private static final String FAKE_ISLOOKUP = "http://beta.services.openaire.eu/";
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(ExpandResultInfoTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(ExpandResultInfoTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(ExpandResultInfoTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(ExpandResultInfoTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
public void expandAS() throws Exception {
SparkExpandResultInfo
.main(
new String[]{
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-asInputPath",
getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/remapping/step2/preparedInfo/as/as")
.getPath(),
"-relationInputPath",
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step2/preparedInfo/relations/relations").getPath(),
"-outputPath",
workingDir.toString() + "/expandedActionSet"
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<ASResultInfo> tmp = sc
.textFile(workingDir.toString() + "/expandedActionSet")
.map(item -> OBJECT_MAPPER.readValue(item, ASResultInfo.class));
Dataset<ASResultInfo> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(ASResultInfo.class));
Assertions.assertEquals(25, verificationDataset.count());
Assertions.assertEquals(0, verificationDataset.filter("substr(id,1,8) = '50|dedup'").count());
Assertions.assertEquals(3, verificationDataset.filter("id = '50|doiboost____::0f10b8f21b7925a344f41edb774f0b0a'").count());
Assertions.assertEquals(3, verificationDataset.filter("id = '50|od_______166::779de9b3a2d224779be52fae43b5fc80'").count());
Assertions.assertEquals(3, verificationDataset.filter("id = '50|od_______165::779de9b3a2d224779be52fae43b5fc80'").count());
Assertions.assertEquals(3, verificationDataset.filter("id = '50|od______3515::779de9b3a2d224779be52fae43b5fc80'").count());
Assertions.assertEquals(2, verificationDataset.filter("id = '50|doiboost____::78329557c23bee513963ebf295d1434d'").count());
Assertions.assertEquals(2, verificationDataset.filter("id = '50|doiboost____::8978b9b797294da5306950a94a58d98c'").count());
Assertions.assertEquals(2, verificationDataset.filter("id = '50|doiboost____::fb2c70723d74f45329640255a959333d'").count());
Assertions.assertEquals(2, verificationDataset.filter("id = '50|base_oa_____::fb2c70723d74f45329640255a959333d'").count());
Assertions.assertEquals(5, verificationDataset.filter("id = '50|_____OmicsDI::039dbb63f11b19dc15113b34ebceb0d2' " +
"or id = '50|_____OmicsDI::05f133acca27d72866c6720a95515f57' or " +
"id = '50|_____OmicsDI::2d508eba981699a30e969d1ab5a068b8' or " +
"id = '50|datacite____::00bddedc38dc045780dc84c27bc8fecd' or " +
"id = '50|datacite____::00f7f89392fa75e944dc8d329e9e8024'").count());
verificationDataset.createOrReplaceTempView("verificationDataset");
Dataset<Row> verify = spark.sql(("SELECT id, type, val.value value, val.trust trust, val.inference_provenance prov " +
"FROM verificationDataset " +
"LATERAL VIEW EXPLODE(value) v as val"));
Assertions.assertEquals(25, verify.count());
Assertions.assertEquals(20, verify.filter("type = 'relation'").count());
Assertions.assertEquals(5, verify.filter("type = 'result'").count());
Assertions.assertEquals(1, verify.filter("id = '50|doiboost____::0f10b8f21b7925a344f41edb774f0b0a' " +
"and value = '40|rcuk________::8dec51859e6b66cd040670b432b9e59c' and " +
"prov = 'iis::document_referencedProjects' and " +
"trust = '0.897'").count());
Assertions.assertEquals(1, verify.filter("id = '50|doiboost____::0f10b8f21b7925a344f41edb774f0b0a' " +
"and value = '40|rcuk________::5e312e08bd65f126d7d79b3d1d677eb3' and " +
"prov = 'iis::document_referencedProjects' and " +
"trust = '0.897'").count());
Assertions.assertEquals(1, verify.filter("id = '50|doiboost____::0f10b8f21b7925a344f41edb774f0b0a' " +
"and value = '40|corda_______::6d500f8fceb2bb81b0750820469e1cd8' and " +
"prov = 'iis::document_referencedProjects' and " +
"trust = '0.7085'").count());
}
}

View File

@ -1,4 +1,358 @@
package eu.dnetlib.dhp.actionmanager.remapping;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
public class PrepareInfoTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ClassLoader cl = PrepareInfoTest.class
.getClassLoader();
private static SparkSession spark;
private static final String FAKE_ISLOOKUP = "http://beta.services.openaire.eu/";
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(PrepareInfoTest.class);
private static final List<ActionSet> actionSetList =
Arrays.asList(ActionSet.newInstance("iis-dataset-entities-preprocessing", "rawset_74155091-ce3b-4951-849c-f41dd4186699_1555613756167","entities_dataset"),
ActionSet.newInstance("iis-document-statistics","rawset_d0a24381-1241-4d83-9669-22110ab72f63_1415965369474","document_statistics"),
ActionSet.newInstance("iis-researchinitiative", "rawset_718b528a-0a10-4303-9290-4f61c04b7ace_1594030710764","document_research_initiative"),
ActionSet.newInstance("iis-document-citations","rawset_b22d36c2-36e5-4ff3-97ef-946fa84a57dd_1594030710774","document_referencedDocuments"),
ActionSet.newInstance("iis-dataset-entities-main","rawset_fffa6131-3e7d-4c2c-82d8-844517e721c0_1594030710760","entities_dataset"),
ActionSet.newInstance("iis-document-affiliation","rawset_d62066d3-b6d9-424a-bea0-bab884a55292_1594030710732","matched_doc_organizations"),
ActionSet.newInstance("iis-document-classes","rawset_35b252fb-2180-4115-818d-a8110616b892_1594030710771","document_classes"),
ActionSet.newInstance("iis-document-similarities","rawset_cc4706b4-ed1d-4862-a13b-b0afdd7016a3_1594030710768","document_similarities_standard"),
ActionSet.newInstance("iis-referenced-datasets-main","rawset_c2ea95d3-a2c0-48f4-9184-5f3478399cc6_1594030710757","document_referencedDatasets"),
ActionSet.newInstance("iis-referenced-datasets-preprocessing","rawset_91543cfa-b543-46c1-a87f-b1a550bc6937_1555613756158","document_referencedDatasets"),
ActionSet.newInstance("iis-referenced-projects-main","rawset_ccf0d39d-0077-4e61-af6c-dc191e2fca68_1594030710754","document_referencedProjects"),
ActionSet.newInstance("iis-referenced-projects-preprocessing","rawset_564ae405-a221-472f-8bd0-ee8bfbbd9164_1555613756135","document_referencedProjects"),
ActionSet.newInstance("iis-referenceextraction-pdb","rawset_07dc3b63-e5a4-4a54-90ba-5226fd55f1c9_1594030710776","document_pdb"),
ActionSet.newInstance("document_software_url","rawset_75e2b097-2a10-41a7-97eb-70737b678793_1594030710779","document_software_url"),
ActionSet.newInstance("iis-wos-entities","rawset_handled6-0ab3-4fd0-a33b-refereed2fc0_1555613756163","entities_document"),
ActionSet.newInstance("iis-extracted-metadata","rawset_f859722c-bfec-4711-9132-8b24766c208d_1415965369473","document_metadata"),
ActionSet.newInstance("dedup-similarity-organization-simple","rawset_a62362a8-9800-4ba3-a060-1fbc0e3ea1a5_1587998691703","dedup-similarity-organization-simple"),
ActionSet.newInstance("dedup-similarity-organization","rawset_9da5e0f1-a49a-40fc-aaac-17de80fa5ceb_1436537292583","dedup-similarity-organization"),
ActionSet.newInstance("dedup-similarity-result-levenstein","rawset_4921d674-aea3-4115-ad33-fe6833569176_1587984647217","dedup-similarity-result-levenstein"),
ActionSet.newInstance("dedup-similarity-person","","dedup-similarity-person"),
ActionSet.newInstance("iis-entities-software","rawset_c4b060b5-d620-45dd-9a0a-25befb23ef7c_1594030710782","entities_software"),
ActionSet.newInstance("iis-communities","rawset_4c632429-6f12-4f18-b54f-e60b346859d7_1594030710791","document_community"),
ActionSet.newInstance("scholexplorer-dump","rawset_d349ffdd-384a-47f6-986f-6c04edee3294_1592572750","scholexplorer-dump"),
ActionSet.newInstance("gridac-dump","rawset_a2854367-3586-4945-a124-1328e91568bd_1571646606840","gridac-dump"),
ActionSet.newInstance("doiboost-organizations","rawset_7626c52f-7f17-47f0-9094-da2c6b883d41_1574951682027","doiboost-organizations"),
ActionSet.newInstance("doiboost","rawset_handledb-a3ae-4d6e-8187-refereed6e18_15912730340000","doiboost"),
ActionSet.newInstance("orcidworks-no-doi","rawset_handledf-ef8d-4e1d-89b6-refereed6ce6_1574862348031","orcidworks-no-doi"),
ActionSet.newInstance("iis-entities-patent","rawset_4dfd999e-7e3c-48eb-be92-92393da81e19_1594030710806","entities_patent"),
ActionSet.newInstance("iis-referenced-patents","rawset_b4387e83-ed2c-4c4b-9d1d-4da35a6ad752_1594030710803","document_patent"),
ActionSet.newInstance("iis-covid-19","rawset_437833b5-b2ef-4e3f-8642-e845086ccc2c_1594030710809","document_covid19"),
ActionSet.newInstance("h2020programme","rawset_bcca8d44-6139-4aec-b579-761552440162_1590697435148","h2020programme"));
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(PrepareInfoTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(PrepareInfoTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(PrepareInfoTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
public void testRelationsAS() {
PrepareInfo pi = new PrepareInfo(false,
workingDir.toString() + "/preparedInfo",
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1/asInputPath").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1").getPath(),
new Gson().fromJson("[\"iis-referenced-projects-main\"]", List.class),
actionSetList);
pi.run();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<ASResultInfo> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo/actionset")
.map(item -> OBJECT_MAPPER.readValue(item, ASResultInfo.class));
Dataset<ASResultInfo> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(ASResultInfo.class));
verificationDataset.show(false);
System.out.println(verificationDataset.count()); //1813019 unidirectional relations
verificationDataset.createOrReplaceTempView("verificationDataset");
Dataset<Row> verify = spark.sql(("SELECT id, type, val.value value, val.trust trust, val.inference_provenance prov " +
"FROM verificationDataset " +
"LATERAL VIEW EXPLODE(value) v as val"));
Assertions.assertEquals(1813019, verificationDataset.count());
Assertions.assertEquals(44804, verificationDataset.filter("type = 'result'").count());
Assertions.assertEquals(1768215, verificationDataset.filter("type = 'relation'").count());
Assertions.assertEquals(0, verify.filter("substr(id,1,3) = '40|'").count());
Assertions.assertEquals(1, verify.filter("id = '50|od________18::c8e57f11074407d59f7114f047afd54e'").count());
Assertions.assertEquals(1, verify.filter("id = '50|od________18::c8e57f11074407d59f7114f047afd54e' and type = 'relation'").count());
Assertions.assertEquals(1, verify.filter("id = '50|od________18::c8e57f11074407d59f7114f047afd54e' and value = '40|nsf_________::2bedb915e92b7dd25b082c6c2f241085'").count());
Assertions.assertEquals(2, verificationDataset.filter("id = '50|dedup_wf_001::1cba00616e303863c34fadaf797d0f8f'").count());
Assertions.assertEquals(1, verify.filter("id = '50|dedup_wf_001::1cba00616e303863c34fadaf797d0f8f' " +
"and type = 'relation' and value = '40|anr_________::5437f242b18aa615acf57dced27975c6'").count());
Assertions.assertEquals(1, verify.filter("id = '50|dedup_wf_001::1cba00616e303863c34fadaf797d0f8f' " +
"and type = 'relation' and value = '40|dfgf________::7aa0b0185d7db055823cd0734ddd6521'").count());
Assertions.assertEquals(1828481, verify.filter("prov = 'iis::document_referencedProjects'").count());
Assertions.assertEquals(1, verificationDataset.filter("type = 'result' and id = '50|coactionpubl::127fa9acff1ee8e86da354d1da2378da'").count());
Assertions.assertEquals(2, verify.filter("type = 'result' and id = '50|coactionpubl::127fa9acff1ee8e86da354d1da2378da'").count());
Assertions.assertEquals(1, verify.filter("type = 'result' and id = '50|coactionpubl::127fa9acff1ee8e86da354d1da2378da' and value = 'mes::projects::307'").count());
Assertions.assertEquals(1, verify.filter("type = 'result' and id = '50|coactionpubl::127fa9acff1ee8e86da354d1da2378da' and value = 'mes::projects::421'").count());
}
@Test
public void testInitiative() {
PrepareInfo pi = new PrepareInfo(false,
workingDir.toString() + "/preparedInfo",
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1/asInputPath").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1").getPath(),
new Gson().fromJson("[\"iis-researchinitiative\"]", List.class),
actionSetList);
pi.run();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<ASResultInfo> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo/actionset")
.map(item -> OBJECT_MAPPER.readValue(item, ASResultInfo.class));
Dataset<ASResultInfo> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(ASResultInfo.class));
verificationDataset.createOrReplaceTempView("verificationDataset");
Dataset<Row> verify = spark.sql(("SELECT id, type, val.value value, val.trust trust, val.inference_provenance prov " +
"FROM verificationDataset " +
"LATERAL VIEW EXPLODE(value) v as val"));
Assertions.assertEquals(14505, verificationDataset.filter("type = 'result'").count());
Assertions.assertEquals(0, verificationDataset.filter("type = 'relation'").count());
Assertions.assertEquals(0, verify.filter("prov != 'iis::document_research_initiative'").count());
Assertions.assertEquals(14639, verify.count());
Assertions.assertEquals(1, verify.filter("id = '50|_____OmicsDI::278d318ee9f051971236234b181d79ce'").count());
Assertions.assertEquals(1, verify.filter("id = '50|_____OmicsDI::278d318ee9f051971236234b181d79ce' and value = 'egi::virtual::10256'").count());
Assertions.assertEquals(1, verify.filter("id = '50|_____OmicsDI::278d318ee9f051971236234b181d79ce' and value = 'egi::virtual::10256' " +
"and trust = '0.9'").count());
Assertions.assertEquals(2, verify.filter("id = '50|dedup_wf_001::be0f7ddf838f07be2ab62e343244a255'").count());
Assertions.assertEquals(1, verify.filter("id = '50|dedup_wf_001::be0f7ddf838f07be2ab62e343244a255' and value = 'egi::virtual::150'").count());
Assertions.assertEquals(1, verify.filter("id = '50|dedup_wf_001::be0f7ddf838f07be2ab62e343244a255' and value = 'egi::virtual::71' " +
"and trust = '0.9'").count());
}
@Test
public void testCommunities() {
PrepareInfo pi = new PrepareInfo(false,
workingDir.toString() + "/preparedInfo",
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1/asInputPath").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1").getPath(),
new Gson().fromJson("[\"iis-communities\"]", List.class),
actionSetList);
pi.run();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<ASResultInfo> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo/actionset")
.map(item -> OBJECT_MAPPER.readValue(item, ASResultInfo.class));
Dataset<ASResultInfo> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(ASResultInfo.class));
verificationDataset.createOrReplaceTempView("verificationDataset");
Dataset<Row> verify = spark.sql(("SELECT id, type, val.value value, val.trust trust, val.inference_provenance prov " +
"FROM verificationDataset " +
"LATERAL VIEW EXPLODE(value) v as val"));
Assertions.assertEquals(0, verificationDataset.filter("type = 'relation'").count());
Assertions.assertEquals(0, verify.filter("prov != 'iis::document_community'").count());
Assertions.assertEquals(1129, verificationDataset.count());
Assertions.assertEquals(1395, verify.count());
Assertions.assertEquals(1, verify.filter("id = '50|core_ac_uk__::2aab0d504dae88edc6d7214f4ab62e4f'").count());
Assertions.assertEquals(1, verify.filter("id = '50|core_ac_uk__::2aab0d504dae88edc6d7214f4ab62e4f' and value = 'dariah' " +
"and trust = '0.9'").count());
Assertions.assertEquals(2, verify.filter("id = '50|core_ac_uk__::2e09ba90d7f712f24a4f48b39571f15f'").count());
Assertions.assertEquals(1, verify.filter("id = '50|core_ac_uk__::2e09ba90d7f712f24a4f48b39571f15f' and value = 'clarin'").count());
Assertions.assertEquals(1, verify.filter("id = '50|core_ac_uk__::2e09ba90d7f712f24a4f48b39571f15f' and value = 'dh-ch::subcommunity::2' " +
"and trust = '0.9'").count());
}
@Test
public void testCovid19() {
PrepareInfo pi = new PrepareInfo(false,
workingDir.toString() + "/preparedInfo",
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1/asInputPath").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1").getPath(),
new Gson().fromJson("[\"iis-covid-19\"]", List.class),
actionSetList);
pi.run();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<ASResultInfo> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo/actionset")
.map(item -> OBJECT_MAPPER.readValue(item, ASResultInfo.class));
Dataset<ASResultInfo> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(ASResultInfo.class));
verificationDataset.createOrReplaceTempView("verificationDataset");
Dataset<Row> verify = spark.sql(("SELECT id, type, val.value value, val.trust trust, val.inference_provenance prov " +
"FROM verificationDataset " +
"LATERAL VIEW EXPLODE(value) v as val"));
Assertions.assertEquals(0, verificationDataset.filter("type = 'relation'").count());
Assertions.assertEquals(0, verify.filter("prov != 'iis::document_covid19'").count());
Assertions.assertEquals(45093, verify.filter("value = 'covid-19'").count());
Assertions.assertEquals(45093, verificationDataset.count());
Assertions.assertEquals(3, verify.filter("id = '50|_____OmicsDI::039dbb63f11b19dc15113b34ebceb0d2' or " +
"id = '50|_____OmicsDI::05f133acca27d72866c6720a95515f57' or " +
"id = '50|_____OmicsDI::19c2cff8e86d7ae39f7e34f43ee06735'").count());
}
@Test
public void testAll() {
PrepareInfo pi = new PrepareInfo(false,
workingDir.toString() + "/preparedInfo",
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1/asInputPath").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1").getPath(),
new Gson().fromJson("[\"iis-researchinitiative\",\"iis-referenced-projects-main\",\"iis-communities\",\"iis-covid-19\"]", List.class),
actionSetList);
pi.run();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<ASResultInfo> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo/actionset")
.map(item -> OBJECT_MAPPER.readValue(item, ASResultInfo.class));
Dataset<ASResultInfo> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(ASResultInfo.class));
verificationDataset.createOrReplaceTempView("verificationDataset");
Dataset<Row> verify = spark.sql(("SELECT id, type, val.value value, val.trust trust, val.inference_provenance prov " +
"FROM verificationDataset " +
"LATERAL VIEW EXPLODE(value) v as val"));
Assertions.assertEquals(1768215, verificationDataset.filter("type = 'relation'").count());
Assertions.assertEquals((45093 + 1129 + 14505 + 44804), verificationDataset.filter("type = 'result'").count());
}
@Test
public void testRelationMerged(){
PrepareInfo pi = new PrepareInfo(false,
workingDir.toString() + "/preparedInfo",
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1/asInputPath").getPath(),
getClass().getResource("/eu/dnetlib/dhp/actionmanager/remapping/step1").getPath(),
new Gson().fromJson("[]", List.class),
actionSetList);
pi.run();
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<RelationMerges> tmp = sc
.textFile(workingDir.toString() + "/preparedInfo/relation")
.map(item -> OBJECT_MAPPER.readValue(item, RelationMerges.class));
Dataset<RelationMerges> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(RelationMerges.class));
verificationDataset.show(false);
Assertions.assertEquals(3, verificationDataset.count());
verificationDataset.createOrReplaceTempView("verificationDataset");
Dataset<Row> verify = spark.sql("Select dedupId, me merged " +
"from verificationDataset " +
"lateral view explode(merges) m as me");
Assertions.assertEquals(8, verify.count());
Assertions.assertEquals(2 , verify.filter("dedupId = '50|dedup_wf_001::1cba00616e303863c34fadaf797d0f8f'").count());
Assertions.assertEquals(2 , verify.filter("dedupId = '50|dedup_wf_001::7df4b3b26df271628a837c209516902a'").count());
Assertions.assertEquals(4 , verify.filter("dedupId = '50|dedup_wf_001::b04d742132c133177e996add1325ec04'").count());
Assertions.assertEquals(2, verify.filter("dedupId = '50|dedup_wf_001::1cba00616e303863c34fadaf797d0f8f' " +
"and (me ='50|base_oa_____::fb2c70723d74f45329640255a959333d' or me = '50|doiboost____::fb2c70723d74f45329640255a959333d')").count());
Assertions.assertEquals(2 , verify.filter("dedupId = '50|dedup_wf_001::7df4b3b26df271628a837c209516902a' " +
"and (me ='50|doiboost____::78329557c23bee513963ebf295d1434d' or me = '50|doiboost____::8978b9b797294da5306950a94a58d98c')").count());
Assertions.assertEquals(4 , verify.filter("dedupId = '50|dedup_wf_001::b04d742132c133177e996add1325ec04' " +
"and (me = '50|od______3515::779de9b3a2d224779be52fae43b5fc80' or me = '50|doiboost____::0f10b8f21b7925a344f41edb774f0b0a' " +
"or me = '50|od_______166::779de9b3a2d224779be52fae43b5fc80' or me = '50|od_______165::779de9b3a2d224779be52fae43b5fc80')").count());
}
}

View File

@ -1,4 +1,199 @@
package eu.dnetlib.dhp.actionmanager.remapping;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
public class RedistributeIISResultTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ClassLoader cl = RedistributeIISResultTest.class
.getClassLoader();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(RedistributeIISResultTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(RedistributeIISResultTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(RedistributeIISResultTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(RedistributeIISResultTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
public void redistributeRelationTest() throws Exception {
SparkRedistributeIISRelations
.main(
new String[]{
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-asInputPath",
getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/remapping/step4/actionset")
.getPath(),
"-outputPath",
workingDir.toString() + "/relationActionSet",
"-inputPath",
getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/remapping/step4/result")
.getPath()
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Relation> tmp = sc
.sequenceFile(workingDir.toString() + "/relationActionSet", Text.class, Text.class)
.map(item -> OBJECT_MAPPER.readValue(item._2().toString(), AtomicAction.class))
.map(aa -> ((Relation)aa.getPayload()));
Dataset<Relation> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
Assertions.assertEquals(14, verificationDataset.count());
Assertions.assertEquals(3, verificationDataset.filter("source = '50|doiboost____::0f10b8f21b7925a344f41edb774f0b0a' and " +
"(target = '40|rcuk________::8dec51859e6b66cd040670b432b9e59c' or " +
"target = '40|rcuk________::5e312e08bd65f126d7d79b3d1d677eb3' or " +
"target = '40|corda_______::6d500f8fceb2bb81b0750820469e1cd8')").count());
Assertions.assertEquals(2, verificationDataset.filter("source = '50|doiboost____::fb2c70723d74f45329640255a959333d' and " +
"(target = '40|dfgf________::7aa0b0185d7db055823cd0734ddd6521' or target = '40|anr_________::5437f242b18aa615acf57dced27975c6') ").count());
Assertions.assertEquals(2, verificationDataset.filter("source = '50|doiboost____::8978b9b797294da5306950a94a58d98c' and " +
"(target = '40|anr_________::55e85886263bf5abe9e28ba4fda9f4ce' or target = '40|anr_________::5eafc553789cd97a12cab7ed1742e2ca') ").count());
Assertions.assertEquals(1, verificationDataset.filter("target = '40|dfgf________::7aa0b0185d7db055823cd0734ddd6521'").count());
Assertions.assertEquals(1, verificationDataset.filter("target = '40|anr_________::5437f242b18aa615acf57dced27975c6'").count());
Assertions.assertEquals(1, verificationDataset.filter("target = '40|anr_________::5eafc553789cd97a12cab7ed1742e2ca'").count());
Assertions.assertEquals(1, verificationDataset.filter("target = '40|anr_________::55e85886263bf5abe9e28ba4fda9f4ce'").count());
Assertions.assertEquals(1, verificationDataset.filter("target = '40|corda_______::6d500f8fceb2bb81b0750820469e1cd8'").count());
Assertions.assertEquals(1, verificationDataset.filter("target = '40|rcuk________::5e312e08bd65f126d7d79b3d1d677eb3'").count());
Assertions.assertEquals(1, verificationDataset.filter("target = '40|rcuk________::8dec51859e6b66cd040670b432b9e59c'").count());
}
@Test
public void redistributeTestResult() throws Exception {
SparkRedistributeIISResult
.main(
new String[]{
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-asInputPath",
getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/remapping/step4/actionset")
.getPath(),
"-outputPath",
workingDir.toString() + "/resultActionSet",
"-inputPath",
getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/remapping/step4/result")
.getPath()
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Result> tmp = sc
.sequenceFile(workingDir.toString() + "/resultActionSet", Text.class, Text.class)
.map(item -> OBJECT_MAPPER.readValue(item._2().toString(), AtomicAction.class))
.map(aa -> ((Result)aa.getPayload()));
Dataset<Result> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Result.class));
Assertions.assertEquals(2, verificationDataset.count());
verificationDataset.createOrReplaceTempView("verificationDataset");
// Assertions.assertEquals(0, verificationDataset.filter("substr(id,1,8) = '50|dedup'").count());
//
// Assertions.assertEquals(3, verificationDataset.filter("id = '50|doiboost____::0f10b8f21b7925a344f41edb774f0b0a'").count());
// Assertions.assertEquals(3, verificationDataset.filter("id = '50|od_______166::779de9b3a2d224779be52fae43b5fc80'").count());
// Assertions.assertEquals(3, verificationDataset.filter("id = '50|od_______165::779de9b3a2d224779be52fae43b5fc80'").count());
// Assertions.assertEquals(3, verificationDataset.filter("id = '50|od______3515::779de9b3a2d224779be52fae43b5fc80'").count());
//
// Assertions.assertEquals(2, verificationDataset.filter("id = '50|doiboost____::78329557c23bee513963ebf295d1434d'").count());
// Assertions.assertEquals(2, verificationDataset.filter("id = '50|doiboost____::8978b9b797294da5306950a94a58d98c'").count());
// Assertions.assertEquals(2, verificationDataset.filter("id = '50|doiboost____::fb2c70723d74f45329640255a959333d'").count());
// Assertions.assertEquals(2, verificationDataset.filter("id = '50|base_oa_____::fb2c70723d74f45329640255a959333d'").count());
//
// Assertions.assertEquals(5, verificationDataset.filter("id = '50|_____OmicsDI::039dbb63f11b19dc15113b34ebceb0d2' " +
// "or id = '50|_____OmicsDI::05f133acca27d72866c6720a95515f57' or " +
// "id = '50|_____OmicsDI::2d508eba981699a30e969d1ab5a068b8' or " +
// "id = '50|datacite____::00bddedc38dc045780dc84c27bc8fecd' or " +
// "id = '50|datacite____::00f7f89392fa75e944dc8d329e9e8024'").count());
//
//
// verificationDataset.createOrReplaceTempView("verificationDataset");
//
// Dataset<Row> verify = spark.sql(("SELECT id, type, val.value value, val.trust trust, val.inference_provenance prov " +
// "FROM verificationDataset " +
// "LATERAL VIEW EXPLODE(value) v as val"));
//
// Assertions.assertEquals(25, verify.count());
//
// Assertions.assertEquals(20, verify.filter("type = 'relation'").count());
// Assertions.assertEquals(5, verify.filter("type = 'result'").count());
//
// Assertions.assertEquals(1, verify.filter("id = '50|doiboost____::0f10b8f21b7925a344f41edb774f0b0a' " +
// "and value = '40|rcuk________::8dec51859e6b66cd040670b432b9e59c' and " +
// "prov = 'iis::document_referencedProjects' and " +
// "trust = '0.897'").count());
//
// Assertions.assertEquals(1, verify.filter("id = '50|doiboost____::0f10b8f21b7925a344f41edb774f0b0a' " +
// "and value = '40|rcuk________::5e312e08bd65f126d7d79b3d1d677eb3' and " +
// "prov = 'iis::document_referencedProjects' and " +
// "trust = '0.897'").count());
//
// Assertions.assertEquals(1, verify.filter("id = '50|doiboost____::0f10b8f21b7925a344f41edb774f0b0a' " +
// "and value = '40|corda_______::6d500f8fceb2bb81b0750820469e1cd8' and " +
// "prov = 'iis::document_referencedProjects' and " +
// "trust = '0.7085'").count());
}
}

View File

@ -1,4 +1,96 @@
package eu.dnetlib.dhp.actionmanager.remapping;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
public class SelectResultTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ClassLoader cl = SelectResultTest.class
.getClassLoader();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(SelectResultTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(SelectResultTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(SelectResultTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(SelectResultTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
public void testSelectResult() throws Exception {
SparkSelectResults
.main(
new String[]{
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-inputPath",
getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/remapping/step3")
.getPath(),
"-outputPath",
workingDir.toString() + "/selectedResults"
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<ResultPid> tmp = sc
.textFile(workingDir.toString() + "/selectedResults")
.map(item -> OBJECT_MAPPER.readValue(item, ResultPid.class));
Dataset<ResultPid> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(ResultPid.class));
verificationDataset.show(false);
Assertions.assertEquals(5, verificationDataset.count());
Assertions.assertEquals(3, verificationDataset.filter("substr(resultId,1,11) = '50|doiboost'").count());
Assertions.assertEquals(2, verificationDataset.filter("substr(resultId,1,11) = '50|datacite'").count());
}
}