[EOSCTag] added resources and finalized test for Jupyter Notebook tagging

This commit is contained in:
Miriam Baglioni 2022-04-22 17:38:10 +02:00
parent 87bff36d9e
commit 27c85e901a
10 changed files with 165 additions and 676 deletions

View File

@ -2,15 +2,13 @@
package eu.dnetlib.dhp.bulktag; package eu.dnetlib.dhp.bulktag;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Software;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
@ -24,6 +22,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.List;
import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.ZENODO_COMMUNITY_INDICATOR; import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.ZENODO_COMMUNITY_INDICATOR;
@ -31,13 +30,7 @@ public class EOSCTagJobTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final String MOCK_IS_LOOK_UP_URL = "BASEURL:8280/is/services/isLookUp";
public static final String pathMap = "{ \"author\" : \"$['author'][*]['fullname']\","
+ " \"title\" : \"$['title'][*]['value']\","
+ " \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\","
+ " \"contributor\" : \"$['contributor'][*]['value']\","
+ " \"description\" : \"$['description'][*]['value']\"}";
private static SparkSession spark; private static SparkSession spark;
@ -45,19 +38,6 @@ public class EOSCTagJobTest {
private static final Logger log = LoggerFactory.getLogger(EOSCTagJobTest.class); private static final Logger log = LoggerFactory.getLogger(EOSCTagJobTest.class);
private static String taggingConf = "";
static {
try {
taggingConf = IOUtils
.toString(
EOSCTagJobTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf.xml"));
} catch (IOException e) {
e.printStackTrace();
}
}
@BeforeAll @BeforeAll
public static void beforeAll() throws IOException { public static void beforeAll() throws IOException {
@ -88,683 +68,102 @@ public class EOSCTagJobTest {
} }
@Test @Test
void noUpdatesTest() throws Exception { void jupyterUpdatesTest() throws Exception {
final String pathMap = EOSCTagJobTest.pathMap;
SparkBulkTagJob spark.read().textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/software").getPath())
.map((MapFunction<String, Software>) value -> OBJECT_MAPPER.readValue(value, Software.class), Encoders.bean(Software.class))
.write()
.option("compression","gzip")
.json(workingDir.toString() + "/input/software");
spark.read().textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/dataset").getPath())
.map((MapFunction<String, Dataset>) value -> OBJECT_MAPPER.readValue(value, Dataset.class), Encoders.bean(Dataset.class))
.write()
.option("compression","gzip")
.json(workingDir.toString() + "/input/dataset");
spark.read().textFile(getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/otherresearchproduct").getPath())
.map((MapFunction<String, OtherResearchProduct>) value -> OBJECT_MAPPER.readValue(value, OtherResearchProduct.class), Encoders.bean(OtherResearchProduct.class))
.write()
.option("compression","gzip")
.json(workingDir.toString() + "/input/otherresearchproduct");
SparkEoscTag
.main( .main(
new String[] { new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", "-sourcePath",
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/no_updates").getPath(), workingDir.toString() + "/input",
"-taggingConf", taggingConf, "-workingPath", workingDir.toString() + "/working"
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Dataset> tmp = sc
.textFile(workingDir.toString() + "/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
Assertions.assertEquals(10, tmp.count());
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
verificationDataset.createOrReplaceTempView("dataset");
String query = "select id, MyT.id community "
+ "from dataset "
+ "lateral view explode(context) c as MyT "
+ "lateral view explode(MyT.datainfo) d as MyD "
+ "where MyD.inferenceprovenance = 'bulktagging'";
Assertions.assertEquals(0, spark.sql(query).count());
}
@Test
void bulktagBySubjectNoPreviousContextTest() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/nocontext")
.getPath();
final String pathMap = EOSCTagJobTest.pathMap;
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Dataset> tmp = sc
.textFile(workingDir.toString() + "/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
Assertions.assertEquals(10, tmp.count());
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
verificationDataset.createOrReplaceTempView("dataset");
String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name "
+ "from dataset "
+ "lateral view explode(context) c as MyT "
+ "lateral view explode(MyT.datainfo) d as MyD "
+ "where MyD.inferenceprovenance = 'bulktagging'";
Assertions.assertEquals(5, spark.sql(query).count());
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
Assertions
.assertEquals(
5, idExplodeCommunity.filter("provenance = 'community:subject'").count());
Assertions
.assertEquals(
5,
idExplodeCommunity.filter("name = 'Bulktagging for Community - Subject'").count());
Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'covid-19'").count());
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'mes'").count());
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'fam'").count());
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'aginfra'").count());
Assertions
.assertEquals(
1,
idExplodeCommunity
.filter("id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529'")
.count());
Assertions
.assertEquals(
1,
idExplodeCommunity
.filter(
"community = 'covid-19' and id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529'")
.count());
Assertions
.assertEquals(
2,
idExplodeCommunity
.filter("id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b'")
.count());
Assertions
.assertEquals(
2,
idExplodeCommunity
.filter(
"(community = 'covid-19' or community = 'aginfra') and id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b'")
.count());
Assertions
.assertEquals(
2,
idExplodeCommunity
.filter("id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62'")
.count());
Assertions
.assertEquals(
2,
idExplodeCommunity
.filter(
"(community = 'mes' or community = 'fam') and id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62'")
.count());
}
@Test
void bulktagBySubjectPreviousContextNoProvenanceTest() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance")
.getPath();
final String pathMap = EOSCTagJobTest.pathMap;
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Dataset> tmp = sc
.textFile(workingDir.toString() + "/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
Assertions.assertEquals(10, tmp.count());
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
verificationDataset.createOrReplaceTempView("dataset");
String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance "
+ "from dataset "
+ "lateral view explode(context) c as MyT "
+ "lateral view explode(MyT.datainfo) d as MyD "
+ "where MyT.id = 'covid-19' ";
Assertions.assertEquals(3, spark.sql(query).count());
org.apache.spark.sql.Dataset<Row> communityContext = spark.sql(query);
Assertions
.assertEquals(
2,
communityContext
.filter("id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529'")
.count());
Assertions
.assertEquals(
1,
communityContext
.filter(
"id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and provenance = 'community:subject'")
.count());
Assertions
.assertEquals(
1,
communityContext
.filter(
"id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and provenance = 'propagation:community:productsthroughsemrel'")
.count());
query = "select id, MyT.id community, size(MyT.datainfo) datainfosize "
+ "from dataset "
+ "lateral view explode (context) as MyT "
+ "where size(MyT.datainfo) > 0";
Assertions
.assertEquals(
2,
spark
.sql(query)
.select("datainfosize")
.where(
"id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' a"
+ "nd community = 'covid-19'")
.collectAsList()
.get(0)
.getInt(0));
}
@Test
void bulktagByDatasourceTest() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-outputPath", workingDir.toString() + "/publication",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Publication> tmp = sc
.textFile(workingDir.toString() + "/publication")
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
Assertions.assertEquals(10, tmp.count());
org.apache.spark.sql.Dataset<Publication> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
verificationDataset.createOrReplaceTempView("publication");
String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name "
+ "from publication "
+ "lateral view explode(context) c as MyT "
+ "lateral view explode(MyT.datainfo) d as MyD "
+ "where MyD.inferenceprovenance = 'bulktagging'";
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
Assertions.assertEquals(5, idExplodeCommunity.count());
Assertions
.assertEquals(
5, idExplodeCommunity.filter("provenance = 'community:datasource'").count());
Assertions
.assertEquals(
5,
idExplodeCommunity
.filter("name = 'Bulktagging for Community - Datasource'")
.count());
Assertions.assertEquals(3, idExplodeCommunity.filter("community = 'fam'").count());
Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'aginfra'").count());
Assertions
.assertEquals(
3,
idExplodeCommunity
.filter(
"community = 'fam' and (id = '50|ec_fp7health::000085c89f4b96dc2269bd37edb35306' "
+ "or id = '50|ec_fp7health::000b9e61f83f5a4b0c35777b7bccdf38' "
+ "or id = '50|ec_fp7health::0010eb63e181e3e91b8b6dc6b3e1c798')")
.count());
Assertions
.assertEquals(
2,
idExplodeCommunity
.filter(
"community = 'aginfra' and (id = '50|ec_fp7health::000c8195edd542e4e64ebb32172cbf89' "
+ "or id = '50|ec_fp7health::0010eb63e181e3e91b8b6dc6b3e1c798')")
.count());
}
@Test
void bulktagByZenodoCommunityTest() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/bulktag/sample/otherresearchproduct/update_zenodocommunity")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct",
"-outputPath", workingDir.toString() + "/orp",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<OtherResearchProduct> tmp = sc
.textFile(workingDir.toString() + "/orp")
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class));
Assertions.assertEquals(10, tmp.count());
org.apache.spark.sql.Dataset<OtherResearchProduct> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(OtherResearchProduct.class));
verificationDataset.createOrReplaceTempView("orp");
String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name "
+ "from orp "
+ "lateral view explode(context) c as MyT "
+ "lateral view explode(MyT.datainfo) d as MyD "
+ "where MyD.inferenceprovenance = 'bulktagging'";
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
Assertions.assertEquals(8, idExplodeCommunity.count());
Assertions
.assertEquals(
8, idExplodeCommunity.filter("provenance = 'community:zenodocommunity'").count());
Assertions
.assertEquals(
8,
idExplodeCommunity.filter("name = 'Bulktagging for Community - Zenodo'").count());
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'covid-19'").count());
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'aginfra'").count());
Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'beopen'").count());
Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'fam'").count());
Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'mes'").count());
Assertions
.assertEquals(
1,
idExplodeCommunity
.filter(
"id = '50|od______2017::0750a4d0782265873d669520f5e33c07' "
+ "and community = 'covid-19'")
.count());
Assertions
.assertEquals(
3,
idExplodeCommunity
.filter(
"id = '50|od______2017::1bd97baef19dbd2db3203b112bb83bc5' and "
+ "(community = 'aginfra' or community = 'mes' or community = 'fam')")
.count());
Assertions
.assertEquals(
1,
idExplodeCommunity
.filter(
"id = '50|od______2017::1e400f1747487fd15998735c41a55c72' "
+ "and community = 'beopen'")
.count());
Assertions
.assertEquals(
3,
idExplodeCommunity
.filter(
"id = '50|od______2017::210281c5bc1c739a11ccceeeca806396' and "
+ "(community = 'beopen' or community = 'fam' or community = 'mes')")
.count());
query = "select id, MyT.id community, size(MyT.datainfo) datainfosize "
+ "from orp "
+ "lateral view explode (context) as MyT "
+ "where size(MyT.datainfo) > 0";
Assertions
.assertEquals(
2,
spark
.sql(query)
.select("datainfosize")
.where(
"id = '50|od______2017::210281c5bc1c739a11ccceeeca806396' a"
+ "nd community = 'beopen'")
.collectAsList()
.get(0)
.getInt(0));
// verify the zenodo community context is not present anymore in the records
query = "select id, MyT.id community "
+ "from orp "
+ "lateral view explode(context) c as MyT "
+ "lateral view explode(MyT.datainfo) d as MyD ";
org.apache.spark.sql.Dataset<Row> tmp2 = spark.sql(query);
Assertions
.assertEquals(
0,
tmp2
.select("community")
.where(tmp2.col("community").contains(ZENODO_COMMUNITY_INDICATOR))
.count());
}
@Test
void bulktagBySubjectDatasourceTest() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject_datasource")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Dataset> tmp = sc
.textFile(workingDir.toString() + "/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
Assertions.assertEquals(10, tmp.count());
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
verificationDataset.createOrReplaceTempView("dataset");
String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name "
+ "from dataset "
+ "lateral view explode(context) c as MyT "
+ "lateral view explode(MyT.datainfo) d as MyD "
+ "where MyD.inferenceprovenance = 'bulktagging'";
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
Assertions.assertEquals(7, idExplodeCommunity.count());
Assertions
.assertEquals(
5, idExplodeCommunity.filter("provenance = 'community:subject'").count());
Assertions
.assertEquals(
2, idExplodeCommunity.filter("provenance = 'community:datasource'").count());
Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'covid-19'").count());
Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'fam'").count());
Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'aginfra'").count());
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'mes'").count());
query = "select id, MyT.id community, size(MyT.datainfo) datainfosize "
+ "from dataset "
+ "lateral view explode (context) as MyT "
+ "where size(MyT.datainfo) > 0";
org.apache.spark.sql.Dataset<Row> tmp2 = spark.sql(query);
Assertions
.assertEquals(
2,
tmp2
.select("datainfosize")
.where(
"id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b' and "
+ "community = 'aginfra'")
.collectAsList()
.get(0)
.getInt(0));
Assertions
.assertEquals(
1,
tmp2
.select("datainfosize")
.where(
"id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b' and "
+ "community = 'covid-19'")
.collectAsList()
.get(0)
.getInt(0));
Assertions
.assertEquals(
2,
tmp2
.select("datainfosize")
.where(
"id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and "
+ "community = 'fam'")
.collectAsList()
.get(0)
.getInt(0));
Assertions
.assertEquals(
2,
tmp2
.select("datainfosize")
.where(
"id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and "
+ "community = 'covid-19'")
.collectAsList()
.get(0)
.getInt(0));
Assertions
.assertEquals(
1,
tmp2
.select("datainfosize")
.where(
"id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62' and "
+ "community = 'fam'")
.collectAsList()
.get(0)
.getInt(0));
Assertions
.assertEquals(
1,
tmp2
.select("datainfosize")
.where(
"id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62' and "
+ "community = 'mes'")
.collectAsList()
.get(0)
.getInt(0));
}
@Test
void bulktagBySubjectDatasourceZenodoCommunityTest() throws Exception {
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/software/").getPath(),
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software",
"-outputPath", workingDir.toString() + "/software",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Software> tmp = sc JavaRDD<Software> tmp = sc
.textFile(workingDir.toString() + "/software") .textFile(workingDir.toString() + "/input/software")
.map(item -> OBJECT_MAPPER.readValue(item, Software.class)); .map(item -> OBJECT_MAPPER.readValue(item, Software.class));
Assertions.assertEquals(10, tmp.count()); Assertions.assertEquals(10, tmp.count());
org.apache.spark.sql.Dataset<Software> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Software.class));
verificationDataset.createOrReplaceTempView("software"); Assertions.assertEquals(4, tmp.filter(s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook"))).count());
String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name " Assertions.assertEquals(2, tmp.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4")).collect()
+ "from software " .get(0).getSubject().size());
+ "lateral view explode(context) c as MyT " Assertions.assertTrue(tmp.filter(sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4")).collect()
+ "lateral view explode(MyT.datainfo) d as MyD " .get(0).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
+ "where MyD.inferenceprovenance = 'bulktagging'";
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
Assertions.assertEquals(10, idExplodeCommunity.count());
idExplodeCommunity.show(false); Assertions.assertEquals(5, tmp.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11")).collect()
Assertions .get(0).getSubject().size());
.assertEquals( Assertions.assertFalse(tmp.filter(sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11")).collect()
3, idExplodeCommunity.filter("provenance = 'community:subject'").count()); .get(0).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
Assertions
.assertEquals(
3, idExplodeCommunity.filter("provenance = 'community:datasource'").count());
Assertions
.assertEquals(
4, idExplodeCommunity.filter("provenance = 'community:zenodocommunity'").count());
Assertions.assertEquals(3, idExplodeCommunity.filter("community = 'covid-19'").count()); Assertions.assertEquals(9, tmp.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56")).collect()
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'dh-ch'").count()); .get(0).getSubject().size());
Assertions.assertEquals(4, idExplodeCommunity.filter("community = 'aginfra'").count()); Assertions.assertTrue(tmp.filter(sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56")).collect()
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'dariah'").count()); .get(0).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'fam'").count());
Assertions Assertions.assertEquals(5, tmp.filter(sw -> sw.getId().equals("50|od______1582::5aec1186054301b66c0c5dc35972a589")).collect()
.assertEquals( .get(0).getSubject().size());
2, Assertions.assertFalse(tmp.filter(sw -> sw.getId().equals("50|od______1582::5aec1186054301b66c0c5dc35972a589")).collect()
idExplodeCommunity .get(0).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
.filter(
"provenance = 'community:zenodocommunity' and "
+ "id = '50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4' and ("
+ "community = 'dh-ch' or community = 'dariah')")
.count());
query = "select id, MyT.id community, size(MyT.datainfo) datainfosize " Assertions.assertEquals(9, tmp.filter(sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0")).collect()
+ "from software " .get(0).getSubject().size());
+ "lateral view explode (context) as MyT " Assertions.assertTrue(tmp.filter(sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0")).collect()
+ "where size(MyT.datainfo) > 0"; .get(0).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
org.apache.spark.sql.Dataset<Row> tmp2 = spark.sql(query); List<StructuredProperty> subjects = tmp.filter(sw -> sw.getId().equals("50|od______1582::6e7a9b21a2feef45673890432af34244")).collect()
.get(0).getSubject();
Assertions.assertEquals(8, subjects.size());
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("jupyter")));
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("Modeling and Simulation")));
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("structure granulaire")));
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("algorithme")));
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("simulation numérique")));
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("flux de gaz")));
Assertions.assertTrue(subjects.stream().anyMatch(s -> s.getValue().equals("flux de liquide")));
Assertions
.assertEquals(
2,
tmp2
.select("datainfosize")
.where(
"id = '50|od______1582::501b25d420f808c8eddcd9b16e917f11' and "
+ "community = 'covid-19'")
.collectAsList()
.get(0)
.getInt(0));
Assertions Assertions.assertEquals(10, sc
.assertEquals( .textFile(workingDir.toString() + "/input/dataset")
3, .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)).count());
tmp2
.select("datainfosize") Assertions.assertEquals(0, sc
.where( .textFile(workingDir.toString() + "/input/dataset")
"id = '50|od______1582::581621232a561b7e8b4952b18b8b0e56' and " .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)).filter(ds -> ds.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook"))).count());
+ "community = 'aginfra'")
.collectAsList()
.get(0) Assertions.assertEquals(10, sc
.getInt(0)); .textFile(workingDir.toString() + "/input/otherresearchproduct")
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class)).count());
Assertions.assertEquals(0, sc
.textFile(workingDir.toString() + "/input/otherresearchproduct")
.map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class)).filter(ds -> ds.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook"))).count());
} }
@Test
void bulktagDatasourcewithConstraintsTest() throws Exception {
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints")
.getPath();
SparkBulkTagJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", workingDir.toString() + "/dataset",
"-isLookUpUrl", MOCK_IS_LOOK_UP_URL,
"-pathMap", pathMap
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Dataset> tmp = sc
.textFile(workingDir.toString() + "/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
Assertions.assertEquals(10, tmp.count());
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
verificationDataset.createOrReplaceTempView("dataset");
String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name "
+ "from dataset "
+ "lateral view explode(context) c as MyT "
+ "lateral view explode(MyT.datainfo) d as MyD "
+ "where MyD.inferenceprovenance = 'bulktagging'";
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
idExplodeCommunity.show(false);
Assertions.assertEquals(3, idExplodeCommunity.count());
Assertions
.assertEquals(
3, idExplodeCommunity.filter("provenance = 'community:datasource'").count());
}
} }

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long