forked from D-Net/dnet-hadoop
test for bulktag
This commit is contained in:
parent
259525cb93
commit
3b2e4ab670
|
@ -1,13 +1,24 @@
|
||||||
package eu.dnetlib.dhp;
|
package eu.dnetlib.dhp;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.community.TagginConstants.ZENODO_COMMUNITY_INDICATOR;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.junit.jupiter.api.AfterAll;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.mortbay.util.IO;
|
import org.mortbay.util.IO;
|
||||||
|
@ -31,7 +42,7 @@ public class BulkTagJobTest {
|
||||||
taggingConf =
|
taggingConf =
|
||||||
IO.toString(
|
IO.toString(
|
||||||
BulkTagJobTest.class.getResourceAsStream(
|
BulkTagJobTest.class.getResourceAsStream(
|
||||||
"/eu/dnetlib/dhp/communityconfiguration/tagging_conf.json"));
|
"/eu/dnetlib/dhp/communityconfiguration/tagging_conf.xml"));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
@ -66,7 +77,7 @@ public class BulkTagJobTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test1() throws Exception {
|
public void noUpdatesTest() throws Exception {
|
||||||
SparkBulkTagJob2.main(
|
SparkBulkTagJob2.main(
|
||||||
new String[] {
|
new String[] {
|
||||||
"-isTest",
|
"-isTest",
|
||||||
|
@ -74,7 +85,7 @@ public class BulkTagJobTest {
|
||||||
"-isSparkSessionManaged",
|
"-isSparkSessionManaged",
|
||||||
Boolean.FALSE.toString(),
|
Boolean.FALSE.toString(),
|
||||||
"-sourcePath",
|
"-sourcePath",
|
||||||
getClass().getResource("/eu/dnetlib/dhp/sample/dataset").getPath(),
|
getClass().getResource("/eu/dnetlib/dhp/sample/dataset/no_updates").getPath(),
|
||||||
"-taggingConf",
|
"-taggingConf",
|
||||||
taggingConf,
|
taggingConf,
|
||||||
"-resultTableName",
|
"-resultTableName",
|
||||||
|
@ -92,142 +103,700 @@ public class BulkTagJobTest {
|
||||||
// "-preparedInfoPath",
|
// "-preparedInfoPath",
|
||||||
// getClass().getResource("/eu/dnetlib/dhp/resulttocommunityfromsemrel/preparedInfo").getPath()
|
// getClass().getResource("/eu/dnetlib/dhp/resulttocommunityfromsemrel/preparedInfo").getPath()
|
||||||
});
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Dataset> tmp =
|
||||||
|
sc.textFile(workingDir.toString() + "/dataset")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||||
|
|
||||||
|
Assertions.assertEquals(10, tmp.count());
|
||||||
|
org.apache.spark.sql.Dataset<Dataset> verificationDataset =
|
||||||
|
spark.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
|
||||||
|
|
||||||
|
verificationDataset.createOrReplaceTempView("dataset");
|
||||||
|
|
||||||
|
String query =
|
||||||
|
"select id, MyT.id community "
|
||||||
|
+ "from dataset "
|
||||||
|
+ "lateral view explode(context) c as MyT "
|
||||||
|
+ "lateral view explode(MyT.datainfo) d as MyD "
|
||||||
|
+ "where MyD.inferenceprovenance = 'bulktagging'";
|
||||||
|
|
||||||
|
Assertions.assertEquals(0, spark.sql(query).count());
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.orcidtoresultfromsemrel.OrcidPropagationJobTest;
|
|
||||||
import eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob2;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
|
||||||
import org.apache.commons.io.FileUtils;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
|
||||||
import org.apache.spark.sql.Encoders;
|
|
||||||
import org.apache.spark.sql.Row;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.junit.jupiter.api.AfterAll;
|
|
||||||
import org.junit.jupiter.api.Assertions;
|
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import static org.apache.spark.sql.functions.desc;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test1() throws Exception {
|
public void bulktagBySubjectNoPreviousContextTest() throws Exception {
|
||||||
SparkResultToCommunityThroughSemRelJob4.main(new String[]{
|
SparkBulkTagJob2.main(
|
||||||
"-isTest", Boolean.TRUE.toString(),
|
new String[] {
|
||||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
"-isTest",
|
||||||
"-sourcePath", getClass().getResource("/eu/dnetlib/dhp/resulttocommunityfromsemrel/sample").getPath(),
|
Boolean.TRUE.toString(),
|
||||||
"-hive_metastore_uris", "",
|
"-isSparkSessionManaged",
|
||||||
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
|
Boolean.FALSE.toString(),
|
||||||
"-outputPath", workingDir.toString() + "/dataset",
|
"-sourcePath",
|
||||||
"-preparedInfoPath", getClass().getResource("/eu/dnetlib/dhp/resulttocommunityfromsemrel/preparedInfo").getPath()
|
getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/sample/dataset/update_subject/nocontext")
|
||||||
|
.getPath(),
|
||||||
|
"-taggingConf",
|
||||||
|
taggingConf,
|
||||||
|
"-resultTableName",
|
||||||
|
"eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/dataset",
|
||||||
|
"-isLookupUrl",
|
||||||
|
"http://beta.services.openaire.eu:8280/is/services/isLookUp",
|
||||||
|
"-protoMap",
|
||||||
|
"{ \"author\" : \"$['author'][*]['fullname']\","
|
||||||
|
+ " \"title\" : \"$['title'][*]['value']\","
|
||||||
|
+ " \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\","
|
||||||
|
+ " \"contributor\" : \"$['contributor'][*]['value']\","
|
||||||
|
+ " \"description\" : \"$['description'][*]['value']\"}"
|
||||||
});
|
});
|
||||||
|
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
JavaRDD<Dataset> tmp = sc.textFile(workingDir.toString() + "/dataset")
|
JavaRDD<Dataset> tmp =
|
||||||
|
sc.textFile(workingDir.toString() + "/dataset")
|
||||||
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||||
|
|
||||||
Assertions.assertEquals(10, tmp.count());
|
Assertions.assertEquals(10, tmp.count());
|
||||||
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
|
org.apache.spark.sql.Dataset<Dataset> verificationDataset =
|
||||||
|
spark.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
|
||||||
|
|
||||||
verificationDataset.createOrReplaceTempView("dataset");
|
verificationDataset.createOrReplaceTempView("dataset");
|
||||||
|
|
||||||
String query = "select id, MyT.id community " +
|
String query =
|
||||||
"from dataset " +
|
"select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name "
|
||||||
"lateral view explode(context) c as MyT " +
|
+ "from dataset "
|
||||||
"lateral view explode(MyT.datainfo) d as MyD " +
|
+ "lateral view explode(context) c as MyT "
|
||||||
"where MyD.inferenceprovenance = 'propagation'";
|
+ "lateral view explode(MyT.datainfo) d as MyD "
|
||||||
|
+ "where MyD.inferenceprovenance = 'bulktagging'";
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Row> resultExplodedProvenance = spark.sql(query);
|
Assertions.assertEquals(5, spark.sql(query).count());
|
||||||
Assertions.assertEquals(5, resultExplodedProvenance.count());
|
|
||||||
|
|
||||||
Assertions.assertEquals(0, resultExplodedProvenance.filter("id = '50|dedup_wf_001::2305908abeca9da37eaf3bddcaf81b7b'").count());
|
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
|
||||||
|
Assertions.assertEquals(
|
||||||
|
5, idExplodeCommunity.filter("provenance = 'community:subject'").count());
|
||||||
|
Assertions.assertEquals(
|
||||||
|
5,
|
||||||
|
idExplodeCommunity.filter("name = 'Bulktagging for Community - Subject'").count());
|
||||||
|
|
||||||
Assertions.assertEquals(1, resultExplodedProvenance.filter("id = '50|dedup_wf_001::0489ae524201eedaa775da282dce35e7'").count());
|
Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'covid-19'").count());
|
||||||
Assertions.assertEquals("dh-ch",resultExplodedProvenance.select("community")
|
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'mes'").count());
|
||||||
.where(resultExplodedProvenance.col("id").equalTo("50|dedup_wf_001::0489ae524201eedaa775da282dce35e7"))
|
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'fam'").count());
|
||||||
.collectAsList().get(0).getString(0));
|
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'aginfra'").count());
|
||||||
|
|
||||||
Assertions.assertEquals(3, resultExplodedProvenance.filter("id = '50|dedup_wf_001::0a60e33b4f0986ebd9819451f2d87a28'").count());
|
Assertions.assertEquals(
|
||||||
List<Row> rowList = resultExplodedProvenance.select("community")
|
1,
|
||||||
.where(resultExplodedProvenance.col("id")
|
idExplodeCommunity
|
||||||
.equalTo("50|dedup_wf_001::0a60e33b4f0986ebd9819451f2d87a28"))
|
.filter("id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529'")
|
||||||
.sort(desc("community")).collectAsList();
|
.count());
|
||||||
Assertions.assertEquals("mes", rowList.get(0).getString(0));
|
Assertions.assertEquals(
|
||||||
Assertions.assertEquals("fam", rowList.get(1).getString(0));
|
1,
|
||||||
Assertions.assertEquals("ee", rowList.get(2).getString(0));
|
idExplodeCommunity
|
||||||
|
.filter(
|
||||||
|
"community = 'covid-19' and id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529'")
|
||||||
Assertions.assertEquals(1, resultExplodedProvenance.filter("id = '50|dedup_wf_001::0ae02edb5598a5545d10b107fcf48dcc'").count());
|
.count());
|
||||||
Assertions.assertEquals("aginfra", resultExplodedProvenance.select("community")
|
|
||||||
.where(resultExplodedProvenance.col("id")
|
|
||||||
.equalTo("50|dedup_wf_001::0ae02edb5598a5545d10b107fcf48dcc"))
|
|
||||||
.collectAsList().get(0).getString(0));
|
|
||||||
|
|
||||||
|
|
||||||
query = "select id, MyT.id community " +
|
|
||||||
"from dataset " +
|
|
||||||
"lateral view explode(context) c as MyT " +
|
|
||||||
"lateral view explode(MyT.datainfo) d as MyD ";
|
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Row> resultCommunityId = spark.sql(query);
|
|
||||||
|
|
||||||
Assertions.assertEquals(10, resultCommunityId.count());
|
|
||||||
|
|
||||||
Assertions.assertEquals(2, resultCommunityId.filter("id = '50|dedup_wf_001::0489ae524201eedaa775da282dce35e7'").count());
|
|
||||||
rowList = resultCommunityId.select("community")
|
|
||||||
.where(resultCommunityId.col("id").equalTo("50|dedup_wf_001::0489ae524201eedaa775da282dce35e7"))
|
|
||||||
.sort(desc("community"))
|
|
||||||
.collectAsList();
|
|
||||||
Assertions.assertEquals("dh-ch", rowList.get(0).getString(0));
|
|
||||||
Assertions.assertEquals("beopen", rowList.get(1).getString(0));
|
|
||||||
|
|
||||||
Assertions.assertEquals(3, resultCommunityId.filter("id = '50|dedup_wf_001::0a60e33b4f0986ebd9819451f2d87a28'").count());
|
|
||||||
rowList = resultCommunityId.select("community")
|
|
||||||
.where(resultCommunityId.col("id").equalTo("50|dedup_wf_001::0a60e33b4f0986ebd9819451f2d87a28"))
|
|
||||||
.sort(desc("community"))
|
|
||||||
.collectAsList();
|
|
||||||
Assertions.assertEquals("mes", rowList.get(0).getString(0));
|
|
||||||
Assertions.assertEquals("fam", rowList.get(1).getString(0));
|
|
||||||
Assertions.assertEquals("ee", rowList.get(2).getString(0));
|
|
||||||
|
|
||||||
Assertions.assertEquals(2, resultCommunityId.filter("id = '50|dedup_wf_001::0ae02edb5598a5545d10b107fcf48dcc'").count());
|
|
||||||
rowList = resultCommunityId.select("community")
|
|
||||||
.where(resultCommunityId.col("id").equalTo("50|dedup_wf_001::0ae02edb5598a5545d10b107fcf48dcc"))
|
|
||||||
.sort(desc("community"))
|
|
||||||
.collectAsList();
|
|
||||||
Assertions.assertEquals("beopen", rowList.get(0).getString(0));
|
|
||||||
Assertions.assertEquals("aginfra", rowList.get(1).getString(0));
|
|
||||||
|
|
||||||
Assertions.assertEquals(2, resultCommunityId.filter("id = '50|dedup_wf_001::2305908abeca9da37eaf3bddcaf81b7b'").count());
|
|
||||||
rowList = resultCommunityId.select("community")
|
|
||||||
.where(resultCommunityId.col("id").equalTo("50|dedup_wf_001::2305908abeca9da37eaf3bddcaf81b7b"))
|
|
||||||
.sort(desc("community"))
|
|
||||||
.collectAsList();
|
|
||||||
Assertions.assertEquals("euromarine", rowList.get(1).getString(0));
|
|
||||||
Assertions.assertEquals("ni", rowList.get(0).getString(0));
|
|
||||||
|
|
||||||
Assertions.assertEquals(1, resultCommunityId.filter("id = '50|doajarticles::8d817039a63710fcf97e30f14662c6c8'").count());
|
|
||||||
Assertions.assertEquals("euromarine", resultCommunityId.select("community")
|
|
||||||
.where(resultCommunityId.col("id")
|
|
||||||
.equalTo("50|doajarticles::8d817039a63710fcf97e30f14662c6c8"))
|
|
||||||
.collectAsList().get(0).getString(0));
|
|
||||||
|
|
||||||
|
Assertions.assertEquals(
|
||||||
|
2,
|
||||||
|
idExplodeCommunity
|
||||||
|
.filter("id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b'")
|
||||||
|
.count());
|
||||||
|
Assertions.assertEquals(
|
||||||
|
2,
|
||||||
|
idExplodeCommunity
|
||||||
|
.filter(
|
||||||
|
"(community = 'covid-19' or community = 'aginfra') and id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b'")
|
||||||
|
.count());
|
||||||
|
|
||||||
|
Assertions.assertEquals(
|
||||||
|
2,
|
||||||
|
idExplodeCommunity
|
||||||
|
.filter("id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62'")
|
||||||
|
.count());
|
||||||
|
Assertions.assertEquals(
|
||||||
|
2,
|
||||||
|
idExplodeCommunity
|
||||||
|
.filter(
|
||||||
|
"(community = 'mes' or community = 'fam') and id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62'")
|
||||||
|
.count());
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
@Test
|
||||||
|
public void bulktagBySubjectPreviousContextNoProvenanceTest() throws Exception {
|
||||||
|
SparkBulkTagJob2.main(
|
||||||
|
new String[] {
|
||||||
|
"-isTest",
|
||||||
|
Boolean.TRUE.toString(),
|
||||||
|
"-isSparkSessionManaged",
|
||||||
|
Boolean.FALSE.toString(),
|
||||||
|
"-sourcePath",
|
||||||
|
getClass()
|
||||||
|
.getResource(
|
||||||
|
"/eu/dnetlib/dhp/sample/dataset/update_subject/contextnoprovenance")
|
||||||
|
.getPath(),
|
||||||
|
"-taggingConf",
|
||||||
|
taggingConf,
|
||||||
|
"-resultTableName",
|
||||||
|
"eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/dataset",
|
||||||
|
"-isLookupUrl",
|
||||||
|
"http://beta.services.openaire.eu:8280/is/services/isLookUp",
|
||||||
|
"-protoMap",
|
||||||
|
"{ \"author\" : \"$['author'][*]['fullname']\","
|
||||||
|
+ " \"title\" : \"$['title'][*]['value']\","
|
||||||
|
+ " \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\","
|
||||||
|
+ " \"contributor\" : \"$['contributor'][*]['value']\","
|
||||||
|
+ " \"description\" : \"$['description'][*]['value']\"}"
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Dataset> tmp =
|
||||||
|
sc.textFile(workingDir.toString() + "/dataset")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||||
|
|
||||||
|
Assertions.assertEquals(10, tmp.count());
|
||||||
|
org.apache.spark.sql.Dataset<Dataset> verificationDataset =
|
||||||
|
spark.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
|
||||||
|
|
||||||
|
verificationDataset.createOrReplaceTempView("dataset");
|
||||||
|
|
||||||
|
String query =
|
||||||
|
"select id, MyT.id community, MyD.provenanceaction.classid provenance "
|
||||||
|
+ "from dataset "
|
||||||
|
+ "lateral view explode(context) c as MyT "
|
||||||
|
+ "lateral view explode(MyT.datainfo) d as MyD "
|
||||||
|
+ "where MyT.id = 'covid-19' ";
|
||||||
|
|
||||||
|
Assertions.assertEquals(3, spark.sql(query).count());
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Row> communityContext = spark.sql(query);
|
||||||
|
|
||||||
|
Assertions.assertEquals(
|
||||||
|
2,
|
||||||
|
communityContext
|
||||||
|
.filter("id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529'")
|
||||||
|
.count());
|
||||||
|
Assertions.assertEquals(
|
||||||
|
1,
|
||||||
|
communityContext
|
||||||
|
.filter(
|
||||||
|
"id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and provenance = 'community:subject'")
|
||||||
|
.count());
|
||||||
|
Assertions.assertEquals(
|
||||||
|
1,
|
||||||
|
communityContext
|
||||||
|
.filter(
|
||||||
|
"id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and provenance = 'propagation:community:productsthroughsemrel'")
|
||||||
|
.count());
|
||||||
|
|
||||||
|
query =
|
||||||
|
"select id, MyT.id community, size(MyT.datainfo) datainfosize "
|
||||||
|
+ "from dataset "
|
||||||
|
+ "lateral view explode (context) as MyT "
|
||||||
|
+ "where size(MyT.datainfo) > 0";
|
||||||
|
|
||||||
|
Assertions.assertEquals(
|
||||||
|
2,
|
||||||
|
spark.sql(query)
|
||||||
|
.select("datainfosize")
|
||||||
|
.where(
|
||||||
|
"id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' a"
|
||||||
|
+ "nd community = 'covid-19'")
|
||||||
|
.collectAsList()
|
||||||
|
.get(0)
|
||||||
|
.getInt(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void bulktagByDatasourceTest() throws Exception {
|
||||||
|
SparkBulkTagJob2.main(
|
||||||
|
new String[] {
|
||||||
|
"-isTest",
|
||||||
|
Boolean.TRUE.toString(),
|
||||||
|
"-isSparkSessionManaged",
|
||||||
|
Boolean.FALSE.toString(),
|
||||||
|
"-sourcePath",
|
||||||
|
getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/sample/publication/update_datasource")
|
||||||
|
.getPath(),
|
||||||
|
"-taggingConf",
|
||||||
|
taggingConf,
|
||||||
|
"-resultTableName",
|
||||||
|
"eu.dnetlib.dhp.schema.oaf.Publication",
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/publication",
|
||||||
|
"-isLookupUrl",
|
||||||
|
"http://beta.services.openaire.eu:8280/is/services/isLookUp",
|
||||||
|
"-protoMap",
|
||||||
|
"{ \"author\" : \"$['author'][*]['fullname']\","
|
||||||
|
+ " \"title\" : \"$['title'][*]['value']\","
|
||||||
|
+ " \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\","
|
||||||
|
+ " \"contributor\" : \"$['contributor'][*]['value']\","
|
||||||
|
+ " \"description\" : \"$['description'][*]['value']\"}"
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Publication> tmp =
|
||||||
|
sc.textFile(workingDir.toString() + "/publication")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
|
||||||
|
|
||||||
|
Assertions.assertEquals(10, tmp.count());
|
||||||
|
org.apache.spark.sql.Dataset<Publication> verificationDataset =
|
||||||
|
spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
|
||||||
|
|
||||||
|
verificationDataset.createOrReplaceTempView("publication");
|
||||||
|
|
||||||
|
String query =
|
||||||
|
"select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name "
|
||||||
|
+ "from publication "
|
||||||
|
+ "lateral view explode(context) c as MyT "
|
||||||
|
+ "lateral view explode(MyT.datainfo) d as MyD "
|
||||||
|
+ "where MyD.inferenceprovenance = 'bulktagging'";
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
|
||||||
|
|
||||||
|
Assertions.assertEquals(5, idExplodeCommunity.count());
|
||||||
|
Assertions.assertEquals(
|
||||||
|
5, idExplodeCommunity.filter("provenance = 'community:datasource'").count());
|
||||||
|
Assertions.assertEquals(
|
||||||
|
5,
|
||||||
|
idExplodeCommunity
|
||||||
|
.filter("name = 'Bulktagging for Community - Datasource'")
|
||||||
|
.count());
|
||||||
|
|
||||||
|
Assertions.assertEquals(3, idExplodeCommunity.filter("community = 'fam'").count());
|
||||||
|
Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'aginfra'").count());
|
||||||
|
|
||||||
|
Assertions.assertEquals(
|
||||||
|
3,
|
||||||
|
idExplodeCommunity
|
||||||
|
.filter(
|
||||||
|
"community = 'fam' and (id = '50|ec_fp7health::000085c89f4b96dc2269bd37edb35306' "
|
||||||
|
+ "or id = '50|ec_fp7health::000b9e61f83f5a4b0c35777b7bccdf38' "
|
||||||
|
+ "or id = '50|ec_fp7health::0010eb63e181e3e91b8b6dc6b3e1c798')")
|
||||||
|
.count());
|
||||||
|
|
||||||
|
Assertions.assertEquals(
|
||||||
|
2,
|
||||||
|
idExplodeCommunity
|
||||||
|
.filter(
|
||||||
|
"community = 'aginfra' and (id = '50|ec_fp7health::000c8195edd542e4e64ebb32172cbf89' "
|
||||||
|
+ "or id = '50|ec_fp7health::0010eb63e181e3e91b8b6dc6b3e1c798')")
|
||||||
|
.count());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void bulktagByZenodoCommunityTest() throws Exception {
|
||||||
|
SparkBulkTagJob2.main(
|
||||||
|
new String[] {
|
||||||
|
"-isTest",
|
||||||
|
Boolean.TRUE.toString(),
|
||||||
|
"-isSparkSessionManaged",
|
||||||
|
Boolean.FALSE.toString(),
|
||||||
|
"-sourcePath",
|
||||||
|
getClass()
|
||||||
|
.getResource(
|
||||||
|
"/eu/dnetlib/dhp/sample/otherresearchproduct/update_zenodocommunity")
|
||||||
|
.getPath(),
|
||||||
|
"-taggingConf",
|
||||||
|
taggingConf,
|
||||||
|
"-resultTableName",
|
||||||
|
"eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/orp",
|
||||||
|
"-isLookupUrl",
|
||||||
|
"http://beta.services.openaire.eu:8280/is/services/isLookUp",
|
||||||
|
"-protoMap",
|
||||||
|
"{ \"author\" : \"$['author'][*]['fullname']\","
|
||||||
|
+ " \"title\" : \"$['title'][*]['value']\","
|
||||||
|
+ " \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\","
|
||||||
|
+ " \"contributor\" : \"$['contributor'][*]['value']\","
|
||||||
|
+ " \"description\" : \"$['description'][*]['value']\"}"
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<OtherResearchProduct> tmp =
|
||||||
|
sc.textFile(workingDir.toString() + "/orp")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class));
|
||||||
|
|
||||||
|
Assertions.assertEquals(10, tmp.count());
|
||||||
|
org.apache.spark.sql.Dataset<OtherResearchProduct> verificationDataset =
|
||||||
|
spark.createDataset(tmp.rdd(), Encoders.bean(OtherResearchProduct.class));
|
||||||
|
|
||||||
|
verificationDataset.createOrReplaceTempView("orp");
|
||||||
|
|
||||||
|
String query =
|
||||||
|
"select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name "
|
||||||
|
+ "from orp "
|
||||||
|
+ "lateral view explode(context) c as MyT "
|
||||||
|
+ "lateral view explode(MyT.datainfo) d as MyD "
|
||||||
|
+ "where MyD.inferenceprovenance = 'bulktagging'";
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
|
||||||
|
Assertions.assertEquals(8, idExplodeCommunity.count());
|
||||||
|
|
||||||
|
Assertions.assertEquals(
|
||||||
|
8, idExplodeCommunity.filter("provenance = 'community:zenodocommunity'").count());
|
||||||
|
Assertions.assertEquals(
|
||||||
|
8,
|
||||||
|
idExplodeCommunity.filter("name = 'Bulktagging for Community - Zenodo'").count());
|
||||||
|
|
||||||
|
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'covid-19'").count());
|
||||||
|
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'aginfra'").count());
|
||||||
|
Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'beopen'").count());
|
||||||
|
Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'fam'").count());
|
||||||
|
Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'mes'").count());
|
||||||
|
|
||||||
|
Assertions.assertEquals(
|
||||||
|
1,
|
||||||
|
idExplodeCommunity
|
||||||
|
.filter(
|
||||||
|
"id = '50|od______2017::0750a4d0782265873d669520f5e33c07' "
|
||||||
|
+ "and community = 'covid-19'")
|
||||||
|
.count());
|
||||||
|
Assertions.assertEquals(
|
||||||
|
3,
|
||||||
|
idExplodeCommunity
|
||||||
|
.filter(
|
||||||
|
"id = '50|od______2017::1bd97baef19dbd2db3203b112bb83bc5' and "
|
||||||
|
+ "(community = 'aginfra' or community = 'mes' or community = 'fam')")
|
||||||
|
.count());
|
||||||
|
Assertions.assertEquals(
|
||||||
|
1,
|
||||||
|
idExplodeCommunity
|
||||||
|
.filter(
|
||||||
|
"id = '50|od______2017::1e400f1747487fd15998735c41a55c72' "
|
||||||
|
+ "and community = 'beopen'")
|
||||||
|
.count());
|
||||||
|
Assertions.assertEquals(
|
||||||
|
3,
|
||||||
|
idExplodeCommunity
|
||||||
|
.filter(
|
||||||
|
"id = '50|od______2017::210281c5bc1c739a11ccceeeca806396' and "
|
||||||
|
+ "(community = 'beopen' or community = 'fam' or community = 'mes')")
|
||||||
|
.count());
|
||||||
|
|
||||||
|
query =
|
||||||
|
"select id, MyT.id community, size(MyT.datainfo) datainfosize "
|
||||||
|
+ "from orp "
|
||||||
|
+ "lateral view explode (context) as MyT "
|
||||||
|
+ "where size(MyT.datainfo) > 0";
|
||||||
|
|
||||||
|
Assertions.assertEquals(
|
||||||
|
2,
|
||||||
|
spark.sql(query)
|
||||||
|
.select("datainfosize")
|
||||||
|
.where(
|
||||||
|
"id = '50|od______2017::210281c5bc1c739a11ccceeeca806396' a"
|
||||||
|
+ "nd community = 'beopen'")
|
||||||
|
.collectAsList()
|
||||||
|
.get(0)
|
||||||
|
.getInt(0));
|
||||||
|
|
||||||
|
// verify the zenodo community context is not present anymore in the records
|
||||||
|
query =
|
||||||
|
"select id, MyT.id community "
|
||||||
|
+ "from orp "
|
||||||
|
+ "lateral view explode(context) c as MyT "
|
||||||
|
+ "lateral view explode(MyT.datainfo) d as MyD ";
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Row> tmp2 = spark.sql(query);
|
||||||
|
|
||||||
|
Assertions.assertEquals(
|
||||||
|
0,
|
||||||
|
tmp2.select("community")
|
||||||
|
.where(tmp2.col("community").contains(ZENODO_COMMUNITY_INDICATOR))
|
||||||
|
.count());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void bulktagBySubjectDatasourceTest() throws Exception {
|
||||||
|
SparkBulkTagJob2.main(
|
||||||
|
new String[] {
|
||||||
|
"-isTest",
|
||||||
|
Boolean.TRUE.toString(),
|
||||||
|
"-isSparkSessionManaged",
|
||||||
|
Boolean.FALSE.toString(),
|
||||||
|
"-sourcePath",
|
||||||
|
getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/sample/dataset/update_subject_datasource")
|
||||||
|
.getPath(),
|
||||||
|
"-taggingConf",
|
||||||
|
taggingConf,
|
||||||
|
"-resultTableName",
|
||||||
|
"eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/dataset",
|
||||||
|
"-isLookupUrl",
|
||||||
|
"http://beta.services.openaire.eu:8280/is/services/isLookUp",
|
||||||
|
"-protoMap",
|
||||||
|
"{ \"author\" : \"$['author'][*]['fullname']\","
|
||||||
|
+ " \"title\" : \"$['title'][*]['value']\","
|
||||||
|
+ " \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\","
|
||||||
|
+ " \"contributor\" : \"$['contributor'][*]['value']\","
|
||||||
|
+ " \"description\" : \"$['description'][*]['value']\"}"
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Dataset> tmp =
|
||||||
|
sc.textFile(workingDir.toString() + "/dataset")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||||
|
|
||||||
|
Assertions.assertEquals(10, tmp.count());
|
||||||
|
org.apache.spark.sql.Dataset<Dataset> verificationDataset =
|
||||||
|
spark.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
|
||||||
|
|
||||||
|
verificationDataset.createOrReplaceTempView("dataset");
|
||||||
|
|
||||||
|
String query =
|
||||||
|
"select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name "
|
||||||
|
+ "from dataset "
|
||||||
|
+ "lateral view explode(context) c as MyT "
|
||||||
|
+ "lateral view explode(MyT.datainfo) d as MyD "
|
||||||
|
+ "where MyD.inferenceprovenance = 'bulktagging'";
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
|
||||||
|
Assertions.assertEquals(7, idExplodeCommunity.count());
|
||||||
|
|
||||||
|
Assertions.assertEquals(
|
||||||
|
5, idExplodeCommunity.filter("provenance = 'community:subject'").count());
|
||||||
|
Assertions.assertEquals(
|
||||||
|
2, idExplodeCommunity.filter("provenance = 'community:datasource'").count());
|
||||||
|
Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'covid-19'").count());
|
||||||
|
Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'fam'").count());
|
||||||
|
Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'aginfra'").count());
|
||||||
|
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'mes'").count());
|
||||||
|
|
||||||
|
query =
|
||||||
|
"select id, MyT.id community, size(MyT.datainfo) datainfosize "
|
||||||
|
+ "from dataset "
|
||||||
|
+ "lateral view explode (context) as MyT "
|
||||||
|
+ "where size(MyT.datainfo) > 0";
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Row> tmp2 = spark.sql(query);
|
||||||
|
|
||||||
|
Assertions.assertEquals(
|
||||||
|
2,
|
||||||
|
tmp2.select("datainfosize")
|
||||||
|
.where(
|
||||||
|
"id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b' and "
|
||||||
|
+ "community = 'aginfra'")
|
||||||
|
.collectAsList()
|
||||||
|
.get(0)
|
||||||
|
.getInt(0));
|
||||||
|
|
||||||
|
Assertions.assertEquals(
|
||||||
|
1,
|
||||||
|
tmp2.select("datainfosize")
|
||||||
|
.where(
|
||||||
|
"id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b' and "
|
||||||
|
+ "community = 'covid-19'")
|
||||||
|
.collectAsList()
|
||||||
|
.get(0)
|
||||||
|
.getInt(0));
|
||||||
|
|
||||||
|
Assertions.assertEquals(
|
||||||
|
2,
|
||||||
|
tmp2.select("datainfosize")
|
||||||
|
.where(
|
||||||
|
"id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and "
|
||||||
|
+ "community = 'fam'")
|
||||||
|
.collectAsList()
|
||||||
|
.get(0)
|
||||||
|
.getInt(0));
|
||||||
|
Assertions.assertEquals(
|
||||||
|
2,
|
||||||
|
tmp2.select("datainfosize")
|
||||||
|
.where(
|
||||||
|
"id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and "
|
||||||
|
+ "community = 'covid-19'")
|
||||||
|
.collectAsList()
|
||||||
|
.get(0)
|
||||||
|
.getInt(0));
|
||||||
|
|
||||||
|
Assertions.assertEquals(
|
||||||
|
1,
|
||||||
|
tmp2.select("datainfosize")
|
||||||
|
.where(
|
||||||
|
"id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62' and "
|
||||||
|
+ "community = 'fam'")
|
||||||
|
.collectAsList()
|
||||||
|
.get(0)
|
||||||
|
.getInt(0));
|
||||||
|
Assertions.assertEquals(
|
||||||
|
1,
|
||||||
|
tmp2.select("datainfosize")
|
||||||
|
.where(
|
||||||
|
"id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62' and "
|
||||||
|
+ "community = 'mes'")
|
||||||
|
.collectAsList()
|
||||||
|
.get(0)
|
||||||
|
.getInt(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void bulktagBySubjectDatasourceZenodoCommunityTest() throws Exception {
|
||||||
|
|
||||||
|
SparkBulkTagJob2.main(
|
||||||
|
new String[] {
|
||||||
|
"-isTest",
|
||||||
|
Boolean.TRUE.toString(),
|
||||||
|
"-isSparkSessionManaged",
|
||||||
|
Boolean.FALSE.toString(),
|
||||||
|
"-sourcePath",
|
||||||
|
getClass().getResource("/eu/dnetlib/dhp/sample/software/").getPath(),
|
||||||
|
"-taggingConf",
|
||||||
|
taggingConf,
|
||||||
|
"-resultTableName",
|
||||||
|
"eu.dnetlib.dhp.schema.oaf.Software",
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/software",
|
||||||
|
"-isLookupUrl",
|
||||||
|
"http://beta.services.openaire.eu:8280/is/services/isLookUp",
|
||||||
|
"-protoMap",
|
||||||
|
"{ \"author\" : \"$['author'][*]['fullname']\","
|
||||||
|
+ " \"title\" : \"$['title'][*]['value']\","
|
||||||
|
+ " \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\","
|
||||||
|
+ " \"contributor\" : \"$['contributor'][*]['value']\","
|
||||||
|
+ " \"description\" : \"$['description'][*]['value']\"}"
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Software> tmp =
|
||||||
|
sc.textFile(workingDir.toString() + "/software")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, Software.class));
|
||||||
|
|
||||||
|
Assertions.assertEquals(10, tmp.count());
|
||||||
|
org.apache.spark.sql.Dataset<Software> verificationDataset =
|
||||||
|
spark.createDataset(tmp.rdd(), Encoders.bean(Software.class));
|
||||||
|
|
||||||
|
verificationDataset.createOrReplaceTempView("software");
|
||||||
|
|
||||||
|
String query =
|
||||||
|
"select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name "
|
||||||
|
+ "from software "
|
||||||
|
+ "lateral view explode(context) c as MyT "
|
||||||
|
+ "lateral view explode(MyT.datainfo) d as MyD "
|
||||||
|
+ "where MyD.inferenceprovenance = 'bulktagging'";
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
|
||||||
|
Assertions.assertEquals(10, idExplodeCommunity.count());
|
||||||
|
|
||||||
|
idExplodeCommunity.show(false);
|
||||||
|
Assertions.assertEquals(
|
||||||
|
3, idExplodeCommunity.filter("provenance = 'community:subject'").count());
|
||||||
|
Assertions.assertEquals(
|
||||||
|
3, idExplodeCommunity.filter("provenance = 'community:datasource'").count());
|
||||||
|
Assertions.assertEquals(
|
||||||
|
4, idExplodeCommunity.filter("provenance = 'community:zenodocommunity'").count());
|
||||||
|
|
||||||
|
Assertions.assertEquals(3, idExplodeCommunity.filter("community = 'covid-19'").count());
|
||||||
|
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'dh-ch'").count());
|
||||||
|
Assertions.assertEquals(4, idExplodeCommunity.filter("community = 'aginfra'").count());
|
||||||
|
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'dariah'").count());
|
||||||
|
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'fam'").count());
|
||||||
|
|
||||||
|
Assertions.assertEquals(
|
||||||
|
2,
|
||||||
|
idExplodeCommunity
|
||||||
|
.filter(
|
||||||
|
"provenance = 'community:zenodocommunity' and "
|
||||||
|
+ "id = '50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4' and ("
|
||||||
|
+ "community = 'dh-ch' or community = 'dariah')")
|
||||||
|
.count());
|
||||||
|
|
||||||
|
query =
|
||||||
|
"select id, MyT.id community, size(MyT.datainfo) datainfosize "
|
||||||
|
+ "from software "
|
||||||
|
+ "lateral view explode (context) as MyT "
|
||||||
|
+ "where size(MyT.datainfo) > 0";
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Row> tmp2 = spark.sql(query);
|
||||||
|
|
||||||
|
Assertions.assertEquals(
|
||||||
|
2,
|
||||||
|
tmp2.select("datainfosize")
|
||||||
|
.where(
|
||||||
|
"id = '50|od______1582::501b25d420f808c8eddcd9b16e917f11' and "
|
||||||
|
+ "community = 'covid-19'")
|
||||||
|
.collectAsList()
|
||||||
|
.get(0)
|
||||||
|
.getInt(0));
|
||||||
|
|
||||||
|
Assertions.assertEquals(
|
||||||
|
3,
|
||||||
|
tmp2.select("datainfosize")
|
||||||
|
.where(
|
||||||
|
"id = '50|od______1582::581621232a561b7e8b4952b18b8b0e56' and "
|
||||||
|
+ "community = 'aginfra'")
|
||||||
|
.collectAsList()
|
||||||
|
.get(0)
|
||||||
|
.getInt(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void bulktagDatasourcewithConstraintsTest() throws Exception {
|
||||||
|
|
||||||
|
SparkBulkTagJob2.main(
|
||||||
|
new String[] {
|
||||||
|
"-isTest",
|
||||||
|
Boolean.TRUE.toString(),
|
||||||
|
"-isSparkSessionManaged",
|
||||||
|
Boolean.FALSE.toString(),
|
||||||
|
"-sourcePath",
|
||||||
|
getClass()
|
||||||
|
.getResource(
|
||||||
|
"/eu/dnetlib/dhp/sample/dataset/update_datasourcewithconstraints")
|
||||||
|
.getPath(),
|
||||||
|
"-taggingConf",
|
||||||
|
taggingConf,
|
||||||
|
"-resultTableName",
|
||||||
|
"eu.dnetlib.dhp.schema.oaf.Dataset",
|
||||||
|
"-outputPath",
|
||||||
|
workingDir.toString() + "/dataset",
|
||||||
|
"-isLookupUrl",
|
||||||
|
"http://beta.services.openaire.eu:8280/is/services/isLookUp",
|
||||||
|
"-protoMap",
|
||||||
|
"{ \"author\" : \"$['author'][*]['fullname']\","
|
||||||
|
+ " \"title\" : \"$['title'][*]['value']\","
|
||||||
|
+ " \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\","
|
||||||
|
+ " \"contributor\" : \"$['contributor'][*]['value']\","
|
||||||
|
+ " \"description\" : \"$['description'][*]['value']\"}"
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Dataset> tmp =
|
||||||
|
sc.textFile(workingDir.toString() + "/dataset")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||||
|
|
||||||
|
Assertions.assertEquals(10, tmp.count());
|
||||||
|
org.apache.spark.sql.Dataset<Dataset> verificationDataset =
|
||||||
|
spark.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
|
||||||
|
|
||||||
|
verificationDataset.createOrReplaceTempView("dataset");
|
||||||
|
String query =
|
||||||
|
"select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name "
|
||||||
|
+ "from dataset "
|
||||||
|
+ "lateral view explode(context) c as MyT "
|
||||||
|
+ "lateral view explode(MyT.datainfo) d as MyD "
|
||||||
|
+ "where MyD.inferenceprovenance = 'bulktagging'";
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
|
||||||
|
|
||||||
|
idExplodeCommunity.show(false);
|
||||||
|
Assertions.assertEquals(2, idExplodeCommunity.count());
|
||||||
|
|
||||||
|
Assertions.assertEquals(
|
||||||
|
2, idExplodeCommunity.filter("provenance = 'community:datasource'").count());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -18,9 +18,6 @@ import org.junit.jupiter.api.Test;
|
||||||
/** Created by miriam on 03/08/2018. */
|
/** Created by miriam on 03/08/2018. */
|
||||||
public class CommunityConfigurationFactoryTest {
|
public class CommunityConfigurationFactoryTest {
|
||||||
|
|
||||||
private static String xml;
|
|
||||||
private static String xml1;
|
|
||||||
|
|
||||||
private final VerbResolver resolver = new VerbResolver();
|
private final VerbResolver resolver = new VerbResolver();
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading…
Reference in New Issue