diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/EOSCTagJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/EOSCTagJobTest.java new file mode 100644 index 000000000..2141e1eac --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/EOSCTagJobTest.java @@ -0,0 +1,770 @@ + +package eu.dnetlib.dhp.bulktag; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Software; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.ZENODO_COMMUNITY_INDICATOR; + +public class EOSCTagJobTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static final String MOCK_IS_LOOK_UP_URL = "BASEURL:8280/is/services/isLookUp"; + + public static final String pathMap = "{ \"author\" : \"$['author'][*]['fullname']\"," + + " \"title\" : \"$['title'][*]['value']\"," + + " \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\"," + + " \"contributor\" : \"$['contributor'][*]['value']\"," + + " \"description\" : \"$['description'][*]['value']\"}"; + + private static SparkSession spark; + + private static Path workingDir; + + private static final Logger log = LoggerFactory.getLogger(EOSCTagJobTest.class); + + private static String taggingConf = ""; + + static { + try { + taggingConf = IOUtils + .toString( + EOSCTagJobTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf.xml")); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(EOSCTagJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(EOSCTagJobTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(EOSCTagJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + void noUpdatesTest() throws Exception { + final String pathMap = EOSCTagJobTest.pathMap; + SparkBulkTagJob + .main( + new String[] { + "-isTest", Boolean.TRUE.toString(), + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", + getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/no_updates").getPath(), + "-taggingConf", taggingConf, + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset", + "-outputPath", workingDir.toString() + "/dataset", + "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, + "-pathMap", pathMap + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/dataset") + .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); + + Assertions.assertEquals(10, tmp.count()); + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(Dataset.class)); + + verificationDataset.createOrReplaceTempView("dataset"); + + String query = "select id, MyT.id community " + + "from dataset " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD " + + "where MyD.inferenceprovenance = 'bulktagging'"; + + Assertions.assertEquals(0, spark.sql(query).count()); + } + + @Test + void bulktagBySubjectNoPreviousContextTest() throws Exception { + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/nocontext") + .getPath(); + final String pathMap = EOSCTagJobTest.pathMap; + SparkBulkTagJob + .main( + new String[] { + "-isTest", Boolean.TRUE.toString(), + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", sourcePath, + "-taggingConf", taggingConf, + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset", + "-outputPath", workingDir.toString() + "/dataset", + "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, + "-pathMap", pathMap + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/dataset") + .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); + + Assertions.assertEquals(10, tmp.count()); + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(Dataset.class)); + + verificationDataset.createOrReplaceTempView("dataset"); + + String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name " + + "from dataset " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD " + + "where MyD.inferenceprovenance = 'bulktagging'"; + + Assertions.assertEquals(5, spark.sql(query).count()); + + org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query); + Assertions + .assertEquals( + 5, idExplodeCommunity.filter("provenance = 'community:subject'").count()); + Assertions + .assertEquals( + 5, + idExplodeCommunity.filter("name = 'Bulktagging for Community - Subject'").count()); + + Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'covid-19'").count()); + Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'mes'").count()); + Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'fam'").count()); + Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'aginfra'").count()); + + Assertions + .assertEquals( + 1, + idExplodeCommunity + .filter("id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529'") + .count()); + Assertions + .assertEquals( + 1, + idExplodeCommunity + .filter( + "community = 'covid-19' and id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529'") + .count()); + + Assertions + .assertEquals( + 2, + idExplodeCommunity + .filter("id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b'") + .count()); + Assertions + .assertEquals( + 2, + idExplodeCommunity + .filter( + "(community = 'covid-19' or community = 'aginfra') and id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b'") + .count()); + + Assertions + .assertEquals( + 2, + idExplodeCommunity + .filter("id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62'") + .count()); + Assertions + .assertEquals( + 2, + idExplodeCommunity + .filter( + "(community = 'mes' or community = 'fam') and id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62'") + .count()); + } + + @Test + void bulktagBySubjectPreviousContextNoProvenanceTest() throws Exception { + final String sourcePath = getClass() + .getResource( + "/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance") + .getPath(); + final String pathMap = EOSCTagJobTest.pathMap; + SparkBulkTagJob + .main( + new String[] { + "-isTest", Boolean.TRUE.toString(), + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", sourcePath, + "-taggingConf", taggingConf, + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset", + "-outputPath", workingDir.toString() + "/dataset", + "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, + "-pathMap", pathMap + }); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/dataset") + .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); + + Assertions.assertEquals(10, tmp.count()); + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(Dataset.class)); + + verificationDataset.createOrReplaceTempView("dataset"); + + String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance " + + "from dataset " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD " + + "where MyT.id = 'covid-19' "; + + Assertions.assertEquals(3, spark.sql(query).count()); + + org.apache.spark.sql.Dataset communityContext = spark.sql(query); + + Assertions + .assertEquals( + 2, + communityContext + .filter("id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529'") + .count()); + Assertions + .assertEquals( + 1, + communityContext + .filter( + "id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and provenance = 'community:subject'") + .count()); + Assertions + .assertEquals( + 1, + communityContext + .filter( + "id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and provenance = 'propagation:community:productsthroughsemrel'") + .count()); + + query = "select id, MyT.id community, size(MyT.datainfo) datainfosize " + + "from dataset " + + "lateral view explode (context) as MyT " + + "where size(MyT.datainfo) > 0"; + + Assertions + .assertEquals( + 2, + spark + .sql(query) + .select("datainfosize") + .where( + "id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' a" + + "nd community = 'covid-19'") + .collectAsList() + .get(0) + .getInt(0)); + } + + @Test + void bulktagByDatasourceTest() throws Exception { + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource") + .getPath(); + SparkBulkTagJob + .main( + new String[] { + "-isTest", Boolean.TRUE.toString(), + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", sourcePath, + "-taggingConf", taggingConf, + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication", + "-outputPath", workingDir.toString() + "/publication", + "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, + "-pathMap", pathMap + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/publication") + .map(item -> OBJECT_MAPPER.readValue(item, Publication.class)); + + Assertions.assertEquals(10, tmp.count()); + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(Publication.class)); + + verificationDataset.createOrReplaceTempView("publication"); + + String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name " + + "from publication " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD " + + "where MyD.inferenceprovenance = 'bulktagging'"; + + org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query); + + Assertions.assertEquals(5, idExplodeCommunity.count()); + Assertions + .assertEquals( + 5, idExplodeCommunity.filter("provenance = 'community:datasource'").count()); + Assertions + .assertEquals( + 5, + idExplodeCommunity + .filter("name = 'Bulktagging for Community - Datasource'") + .count()); + + Assertions.assertEquals(3, idExplodeCommunity.filter("community = 'fam'").count()); + Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'aginfra'").count()); + + Assertions + .assertEquals( + 3, + idExplodeCommunity + .filter( + "community = 'fam' and (id = '50|ec_fp7health::000085c89f4b96dc2269bd37edb35306' " + + "or id = '50|ec_fp7health::000b9e61f83f5a4b0c35777b7bccdf38' " + + "or id = '50|ec_fp7health::0010eb63e181e3e91b8b6dc6b3e1c798')") + .count()); + + Assertions + .assertEquals( + 2, + idExplodeCommunity + .filter( + "community = 'aginfra' and (id = '50|ec_fp7health::000c8195edd542e4e64ebb32172cbf89' " + + "or id = '50|ec_fp7health::0010eb63e181e3e91b8b6dc6b3e1c798')") + .count()); + } + + @Test + void bulktagByZenodoCommunityTest() throws Exception { + final String sourcePath = getClass() + .getResource( + "/eu/dnetlib/dhp/bulktag/sample/otherresearchproduct/update_zenodocommunity") + .getPath(); + SparkBulkTagJob + .main( + new String[] { + "-isTest", Boolean.TRUE.toString(), + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", sourcePath, + "-taggingConf", taggingConf, + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct", + "-outputPath", workingDir.toString() + "/orp", + "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, + "-pathMap", pathMap + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/orp") + .map(item -> OBJECT_MAPPER.readValue(item, OtherResearchProduct.class)); + + Assertions.assertEquals(10, tmp.count()); + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(OtherResearchProduct.class)); + + verificationDataset.createOrReplaceTempView("orp"); + + String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name " + + "from orp " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD " + + "where MyD.inferenceprovenance = 'bulktagging'"; + + org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query); + Assertions.assertEquals(8, idExplodeCommunity.count()); + + Assertions + .assertEquals( + 8, idExplodeCommunity.filter("provenance = 'community:zenodocommunity'").count()); + Assertions + .assertEquals( + 8, + idExplodeCommunity.filter("name = 'Bulktagging for Community - Zenodo'").count()); + + Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'covid-19'").count()); + Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'aginfra'").count()); + Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'beopen'").count()); + Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'fam'").count()); + Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'mes'").count()); + + Assertions + .assertEquals( + 1, + idExplodeCommunity + .filter( + "id = '50|od______2017::0750a4d0782265873d669520f5e33c07' " + + "and community = 'covid-19'") + .count()); + Assertions + .assertEquals( + 3, + idExplodeCommunity + .filter( + "id = '50|od______2017::1bd97baef19dbd2db3203b112bb83bc5' and " + + "(community = 'aginfra' or community = 'mes' or community = 'fam')") + .count()); + Assertions + .assertEquals( + 1, + idExplodeCommunity + .filter( + "id = '50|od______2017::1e400f1747487fd15998735c41a55c72' " + + "and community = 'beopen'") + .count()); + Assertions + .assertEquals( + 3, + idExplodeCommunity + .filter( + "id = '50|od______2017::210281c5bc1c739a11ccceeeca806396' and " + + "(community = 'beopen' or community = 'fam' or community = 'mes')") + .count()); + + query = "select id, MyT.id community, size(MyT.datainfo) datainfosize " + + "from orp " + + "lateral view explode (context) as MyT " + + "where size(MyT.datainfo) > 0"; + + Assertions + .assertEquals( + 2, + spark + .sql(query) + .select("datainfosize") + .where( + "id = '50|od______2017::210281c5bc1c739a11ccceeeca806396' a" + + "nd community = 'beopen'") + .collectAsList() + .get(0) + .getInt(0)); + + // verify the zenodo community context is not present anymore in the records + query = "select id, MyT.id community " + + "from orp " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD "; + + org.apache.spark.sql.Dataset tmp2 = spark.sql(query); + + Assertions + .assertEquals( + 0, + tmp2 + .select("community") + .where(tmp2.col("community").contains(ZENODO_COMMUNITY_INDICATOR)) + .count()); + } + + @Test + void bulktagBySubjectDatasourceTest() throws Exception { + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject_datasource") + .getPath(); + SparkBulkTagJob + .main( + new String[] { + "-isTest", Boolean.TRUE.toString(), + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", sourcePath, + "-taggingConf", taggingConf, + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset", + "-outputPath", workingDir.toString() + "/dataset", + "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, + "-pathMap", pathMap + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/dataset") + .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); + + Assertions.assertEquals(10, tmp.count()); + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(Dataset.class)); + + verificationDataset.createOrReplaceTempView("dataset"); + + String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name " + + "from dataset " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD " + + "where MyD.inferenceprovenance = 'bulktagging'"; + + org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query); + Assertions.assertEquals(7, idExplodeCommunity.count()); + + Assertions + .assertEquals( + 5, idExplodeCommunity.filter("provenance = 'community:subject'").count()); + Assertions + .assertEquals( + 2, idExplodeCommunity.filter("provenance = 'community:datasource'").count()); + Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'covid-19'").count()); + Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'fam'").count()); + Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'aginfra'").count()); + Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'mes'").count()); + + query = "select id, MyT.id community, size(MyT.datainfo) datainfosize " + + "from dataset " + + "lateral view explode (context) as MyT " + + "where size(MyT.datainfo) > 0"; + + org.apache.spark.sql.Dataset tmp2 = spark.sql(query); + + Assertions + .assertEquals( + 2, + tmp2 + .select("datainfosize") + .where( + "id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b' and " + + "community = 'aginfra'") + .collectAsList() + .get(0) + .getInt(0)); + + Assertions + .assertEquals( + 1, + tmp2 + .select("datainfosize") + .where( + "id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b' and " + + "community = 'covid-19'") + .collectAsList() + .get(0) + .getInt(0)); + + Assertions + .assertEquals( + 2, + tmp2 + .select("datainfosize") + .where( + "id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and " + + "community = 'fam'") + .collectAsList() + .get(0) + .getInt(0)); + Assertions + .assertEquals( + 2, + tmp2 + .select("datainfosize") + .where( + "id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and " + + "community = 'covid-19'") + .collectAsList() + .get(0) + .getInt(0)); + + Assertions + .assertEquals( + 1, + tmp2 + .select("datainfosize") + .where( + "id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62' and " + + "community = 'fam'") + .collectAsList() + .get(0) + .getInt(0)); + Assertions + .assertEquals( + 1, + tmp2 + .select("datainfosize") + .where( + "id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62' and " + + "community = 'mes'") + .collectAsList() + .get(0) + .getInt(0)); + } + + @Test + void bulktagBySubjectDatasourceZenodoCommunityTest() throws Exception { + + SparkBulkTagJob + .main( + new String[] { + "-isTest", Boolean.TRUE.toString(), + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/software/").getPath(), + "-taggingConf", taggingConf, + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Software", + "-outputPath", workingDir.toString() + "/software", + "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, + "-pathMap", pathMap + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/software") + .map(item -> OBJECT_MAPPER.readValue(item, Software.class)); + + Assertions.assertEquals(10, tmp.count()); + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(Software.class)); + + verificationDataset.createOrReplaceTempView("software"); + + String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name " + + "from software " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD " + + "where MyD.inferenceprovenance = 'bulktagging'"; + + org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query); + Assertions.assertEquals(10, idExplodeCommunity.count()); + + idExplodeCommunity.show(false); + Assertions + .assertEquals( + 3, idExplodeCommunity.filter("provenance = 'community:subject'").count()); + Assertions + .assertEquals( + 3, idExplodeCommunity.filter("provenance = 'community:datasource'").count()); + Assertions + .assertEquals( + 4, idExplodeCommunity.filter("provenance = 'community:zenodocommunity'").count()); + + Assertions.assertEquals(3, idExplodeCommunity.filter("community = 'covid-19'").count()); + Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'dh-ch'").count()); + Assertions.assertEquals(4, idExplodeCommunity.filter("community = 'aginfra'").count()); + Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'dariah'").count()); + Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'fam'").count()); + + Assertions + .assertEquals( + 2, + idExplodeCommunity + .filter( + "provenance = 'community:zenodocommunity' and " + + "id = '50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4' and (" + + "community = 'dh-ch' or community = 'dariah')") + .count()); + + query = "select id, MyT.id community, size(MyT.datainfo) datainfosize " + + "from software " + + "lateral view explode (context) as MyT " + + "where size(MyT.datainfo) > 0"; + + org.apache.spark.sql.Dataset tmp2 = spark.sql(query); + + Assertions + .assertEquals( + 2, + tmp2 + .select("datainfosize") + .where( + "id = '50|od______1582::501b25d420f808c8eddcd9b16e917f11' and " + + "community = 'covid-19'") + .collectAsList() + .get(0) + .getInt(0)); + + Assertions + .assertEquals( + 3, + tmp2 + .select("datainfosize") + .where( + "id = '50|od______1582::581621232a561b7e8b4952b18b8b0e56' and " + + "community = 'aginfra'") + .collectAsList() + .get(0) + .getInt(0)); + } + + @Test + void bulktagDatasourcewithConstraintsTest() throws Exception { + + final String sourcePath = getClass() + .getResource( + "/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints") + .getPath(); + SparkBulkTagJob + .main( + new String[] { + "-isTest", Boolean.TRUE.toString(), + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", sourcePath, + "-taggingConf", taggingConf, + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset", + "-outputPath", workingDir.toString() + "/dataset", + "-isLookUpUrl", MOCK_IS_LOOK_UP_URL, + "-pathMap", pathMap + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/dataset") + .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); + + Assertions.assertEquals(10, tmp.count()); + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(Dataset.class)); + + verificationDataset.createOrReplaceTempView("dataset"); + String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name " + + "from dataset " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD " + + "where MyD.inferenceprovenance = 'bulktagging'"; + + org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query); + + idExplodeCommunity.show(false); + Assertions.assertEquals(3, idExplodeCommunity.count()); + + Assertions + .assertEquals( + 3, idExplodeCommunity.filter("provenance = 'community:datasource'").count()); + } +}