From 12de9acb0da61cc83e4178a7d6619722532d97fd Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 11 Mar 2022 14:17:02 +0100 Subject: [PATCH] [Country Propagation] left out from previous commit --- .../PrepareDatasourceCountryAssociation.java | 7 +- .../PrepareResultCountrySet.java | 2 +- .../SparkCountryPropagationJob.java | 1 - .../CountryPropagationJobTest.java | 152 +++++++++--- .../DatasourceCountryPreparationTest.java | 219 ++++++++++++------ .../ResultCountryPreparationTest.java | 201 +++++++++------- 6 files changed, 383 insertions(+), 199 deletions(-) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java index c02f37015..42b7804ea 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java @@ -7,7 +7,6 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.Arrays; import java.util.List; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -20,6 +19,8 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Datasource; @@ -100,8 +101,8 @@ public class PrepareDatasourceCountryAssociation { Dataset organization = readPath(spark, inputPath + "/organization", Organization.class) .filter( (FilterFunction) o -> !o.getDataInfo().getDeletedbyinference() && - o.getCountry().getClassid().length() > 0 && - !o.getCountry().getClassid().equals(ModelConstants.UNKNOWN)) ; + o.getCountry().getClassid().length() > 0 && + !o.getCountry().getClassid().equals(ModelConstants.UNKNOWN)); // associated the datasource id with the id of the organization providing the datasource Dataset dse = datasource diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountrySet.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountrySet.java index 8ca87fa21..560a22381 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountrySet.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountrySet.java @@ -61,7 +61,7 @@ public class PrepareResultCountrySet { Class resultClazz = (Class) Class.forName(resultClassName); SparkConf conf = new SparkConf(); - //conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + // conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); runWithSparkSession( conf, diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index 1bbfbdafd..56aa953b4 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -80,7 +80,6 @@ public class SparkCountryPropagationJob { String outputPath, Class resultClazz) { - log.info("Reading Graph table from: {}", sourcePath); Dataset res = readPath(spark, sourcePath, resultClazz); diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/countrypropagation/CountryPropagationJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/countrypropagation/CountryPropagationJobTest.java index 50c7f3a69..c4141b3e8 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/countrypropagation/CountryPropagationJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/countrypropagation/CountryPropagationJobTest.java @@ -7,7 +7,6 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.List; -import eu.dnetlib.dhp.schema.oaf.Publication; import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -25,6 +24,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.oaf.Country; +import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Software; import scala.Tuple2; @@ -260,66 +260,142 @@ public class CountryPropagationJobTest { @Test void testCountryPropagationPublication() throws Exception { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/countrypropagation/graph/publication") - .getPath(); + .getResource("/eu/dnetlib/dhp/countrypropagation/graph/publication") + .getPath(); final String preparedInfoPath = getClass() - .getResource("/eu/dnetlib/dhp/countrypropagation/preparedInfo/publication") - .getPath(); + .getResource("/eu/dnetlib/dhp/countrypropagation/preparedInfo/publication") + .getPath(); SparkCountryPropagationJob - .main( - new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--sourcePath", sourcePath, - "-resultTableName", Publication.class.getCanonicalName(), - "-outputPath", workingDir.toString() + "/publication", - "-preparedInfoPath", preparedInfoPath - }); + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", sourcePath, + "-resultTableName", Publication.class.getCanonicalName(), + "-outputPath", workingDir.toString() + "/publication", + "-preparedInfoPath", preparedInfoPath + }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/publication") - .map(item -> OBJECT_MAPPER.readValue(item, Publication.class)); + .textFile(workingDir.toString() + "/publication") + .map(item -> OBJECT_MAPPER.readValue(item, Publication.class)); Assertions.assertEquals(12, tmp.count()); Assertions.assertEquals(5, tmp.filter(r -> r.getCountry().size() > 0).count()); - tmp.foreach(r -> r.getCountry().stream().forEach(c -> Assertions.assertEquals("dnet:countries", c.getSchemeid()))); - tmp.foreach(r -> r.getCountry().stream().forEach(c -> Assertions.assertEquals("dnet:countries", c.getSchemename()))); - tmp.foreach(r -> r.getCountry().stream().forEach(c -> Assertions.assertFalse(c.getDataInfo().getDeletedbyinference()))); + tmp + .foreach( + r -> r.getCountry().stream().forEach(c -> Assertions.assertEquals("dnet:countries", c.getSchemeid()))); + tmp + .foreach( + r -> r + .getCountry() + .stream() + .forEach(c -> Assertions.assertEquals("dnet:countries", c.getSchemename()))); + tmp + .foreach( + r -> r + .getCountry() + .stream() + .forEach(c -> Assertions.assertFalse(c.getDataInfo().getDeletedbyinference()))); tmp.foreach(r -> r.getCountry().stream().forEach(c -> Assertions.assertFalse(c.getDataInfo().getInvisible()))); tmp.foreach(r -> r.getCountry().stream().forEach(c -> Assertions.assertTrue(c.getDataInfo().getInferred()))); - tmp.foreach(r -> r.getCountry().stream().forEach(c -> Assertions.assertEquals("0.85", c.getDataInfo().getTrust()))); - tmp.foreach(r -> r.getCountry().stream().forEach(c -> Assertions.assertEquals("propagation", c.getDataInfo().getInferenceprovenance()))); - tmp.foreach(r -> r.getCountry().stream().forEach(c -> Assertions.assertEquals("country:instrepos", c.getDataInfo().getProvenanceaction().getClassid()))); - tmp.foreach(r -> r.getCountry().stream().forEach(c -> Assertions.assertEquals("dnet:provenanceActions", c.getDataInfo().getProvenanceaction().getSchemeid()))); - tmp.foreach(r -> r.getCountry().stream().forEach(c -> Assertions.assertEquals("dnet:provenanceActions", c.getDataInfo().getProvenanceaction().getSchemename()))); + tmp + .foreach( + r -> r.getCountry().stream().forEach(c -> Assertions.assertEquals("0.85", c.getDataInfo().getTrust()))); + tmp + .foreach( + r -> r + .getCountry() + .stream() + .forEach(c -> Assertions.assertEquals("propagation", c.getDataInfo().getInferenceprovenance()))); + tmp + .foreach( + r -> r + .getCountry() + .stream() + .forEach( + c -> Assertions + .assertEquals("country:instrepos", c.getDataInfo().getProvenanceaction().getClassid()))); + tmp + .foreach( + r -> r + .getCountry() + .stream() + .forEach( + c -> Assertions + .assertEquals( + "dnet:provenanceActions", c.getDataInfo().getProvenanceaction().getSchemeid()))); + tmp + .foreach( + r -> r + .getCountry() + .stream() + .forEach( + c -> Assertions + .assertEquals( + "dnet:provenanceActions", c.getDataInfo().getProvenanceaction().getSchemename()))); - List countries = tmp.filter(r -> r.getId().equals("50|06cdd3ff4700::49ec404cee4e1452808aabeaffbd3072")).collect().get(0).getCountry(); + List countries = tmp + .filter(r -> r.getId().equals("50|06cdd3ff4700::49ec404cee4e1452808aabeaffbd3072")) + .collect() + .get(0) + .getCountry(); Assertions.assertEquals(1, countries.size()); - Assertions.assertEquals("NL",countries.get(0).getClassid()); - Assertions.assertEquals("Netherlands",countries.get(0).getClassname()); + Assertions.assertEquals("NL", countries.get(0).getClassid()); + Assertions.assertEquals("Netherlands", countries.get(0).getClassname()); - countries = tmp.filter(r -> r.getId().equals("50|07b5c0ccd4fe::e7f5459cc97865f2af6e3da964c1250b")).collect().get(0).getCountry(); + countries = tmp + .filter(r -> r.getId().equals("50|07b5c0ccd4fe::e7f5459cc97865f2af6e3da964c1250b")) + .collect() + .get(0) + .getCountry(); Assertions.assertEquals(1, countries.size()); - Assertions.assertEquals("NL",countries.get(0).getClassid()); - Assertions.assertEquals("Netherlands",countries.get(0).getClassname()); + Assertions.assertEquals("NL", countries.get(0).getClassid()); + Assertions.assertEquals("Netherlands", countries.get(0).getClassname()); - countries = tmp.filter(r -> r.getId().equals("50|355e65625b88::e7d48a470b13bda61f7ebe3513e20cb6")).collect().get(0).getCountry(); + countries = tmp + .filter(r -> r.getId().equals("50|355e65625b88::e7d48a470b13bda61f7ebe3513e20cb6")) + .collect() + .get(0) + .getCountry(); Assertions.assertEquals(2, countries.size()); - Assertions.assertTrue(countries.stream().anyMatch(cs -> cs.getClassid().equals("IT") && cs.getClassname().equals("Italy"))); - Assertions.assertTrue(countries.stream().anyMatch(cs -> cs.getClassid().equals("FR") && cs.getClassname().equals("France"))); + Assertions + .assertTrue( + countries.stream().anyMatch(cs -> cs.getClassid().equals("IT") && cs.getClassname().equals("Italy"))); + Assertions + .assertTrue( + countries.stream().anyMatch(cs -> cs.getClassid().equals("FR") && cs.getClassname().equals("France"))); - countries = tmp.filter(r -> r.getId().equals("50|355e65625b88::74009c567c81b4aa55c813db658734df")).collect().get(0).getCountry(); + countries = tmp + .filter(r -> r.getId().equals("50|355e65625b88::74009c567c81b4aa55c813db658734df")) + .collect() + .get(0) + .getCountry(); Assertions.assertEquals(2, countries.size()); - Assertions.assertTrue(countries.stream().anyMatch(cs -> cs.getClassid().equals("IT") && cs.getClassname().equals("Italy"))); - Assertions.assertTrue(countries.stream().anyMatch(cs -> cs.getClassid().equals("NL") && cs.getClassname().equals("Netherlands"))); + Assertions + .assertTrue( + countries.stream().anyMatch(cs -> cs.getClassid().equals("IT") && cs.getClassname().equals("Italy"))); + Assertions + .assertTrue( + countries + .stream() + .anyMatch(cs -> cs.getClassid().equals("NL") && cs.getClassname().equals("Netherlands"))); - countries = tmp.filter(r -> r.getId().equals("50|355e65625b88::54a1c76f520bb2c8da27d12e42891088")).collect().get(0).getCountry(); + countries = tmp + .filter(r -> r.getId().equals("50|355e65625b88::54a1c76f520bb2c8da27d12e42891088")) + .collect() + .get(0) + .getCountry(); Assertions.assertEquals(2, countries.size()); - Assertions.assertTrue(countries.stream().anyMatch(cs -> cs.getClassid().equals("IT") && cs.getClassname().equals("Italy"))); - Assertions.assertTrue(countries.stream().anyMatch(cs -> cs.getClassid().equals("FR") && cs.getClassname().equals("France"))); + Assertions + .assertTrue( + countries.stream().anyMatch(cs -> cs.getClassid().equals("IT") && cs.getClassname().equals("Italy"))); + Assertions + .assertTrue( + countries.stream().anyMatch(cs -> cs.getClassid().equals("FR") && cs.getClassname().equals("France"))); } } diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/countrypropagation/DatasourceCountryPreparationTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/countrypropagation/DatasourceCountryPreparationTest.java index 8560a9d66..d9b879de8 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/countrypropagation/DatasourceCountryPreparationTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/countrypropagation/DatasourceCountryPreparationTest.java @@ -1,111 +1,176 @@ + package eu.dnetlib.dhp.countrypropagation; -import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; - import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; - +import com.fasterxml.jackson.databind.ObjectMapper; public class DatasourceCountryPreparationTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static SparkSession spark; + private static SparkSession spark; - private static Path workingDir; + private static Path workingDir; - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files.createTempDirectory(DatasourceCountryPreparationTest.class.getSimpleName()); + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(DatasourceCountryPreparationTest.class.getSimpleName()); - SparkConf conf = new SparkConf(); - conf.setAppName(DatasourceCountryPreparationTest.class.getSimpleName()); + SparkConf conf = new SparkConf(); + conf.setAppName(DatasourceCountryPreparationTest.class.getSimpleName()); - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.toString()); - conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); - spark = SparkSession - .builder() - .appName(DatasourceCountryPreparationTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } + spark = SparkSession + .builder() + .appName(DatasourceCountryPreparationTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - spark.stop(); - } + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } - @Test - void testPrepareDatasourceCountry() throws Exception { - final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/countrypropagation/graph") - .getPath(); + @Test + void testPrepareDatasourceCountry() throws Exception { + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/countrypropagation/graph") + .getPath(); - PrepareDatasourceCountryAssociation - .main( - new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--sourcePath", sourcePath, - "--outputPath", workingDir.toString() + "/datasourceCountry", - "--allowedtypes","pubsrepository::institutional" , - "--whitelist","10|openaire____::3795d6478e30e2c9f787d427ff160944;10|opendoar____::16e6a3326dd7d868cbc926602a61e4d0;10|eurocrisdris::fe4903425d9040f680d8610d9079ea14;10|openaire____::5b76240cc27a58c6f7ceef7d8c36660e;10|openaire____::172bbccecf8fca44ab6a6653e84cb92a;10|openaire____::149c6590f8a06b46314eed77bfca693f;10|eurocrisdris::a6026877c1a174d60f81fd71f62df1c1;10|openaire____::4692342f0992d91f9e705c26959f09e0;10|openaire____::8d529dbb05ec0284662b391789e8ae2a;10|openaire____::345c9d171ef3c5d706d08041d506428c;10|opendoar____::1c1d4df596d01da60385f0bb17a4a9e0;10|opendoar____::7a614fd06c325499f1680b9896beedeb;10|opendoar____::1ee3dfcd8a0645a25a35977997223d22;10|opendoar____::d296c101daa88a51f6ca8cfc1ac79b50;10|opendoar____::798ed7d4ee7138d49b8828958048130a;10|openaire____::c9d2209ecc4d45ba7b4ca7597acb88a2;10|eurocrisdris::c49e0fe4b9ba7b7fab717d1f0f0a674d;10|eurocrisdris::9ae43d14471c4b33661fedda6f06b539;10|eurocrisdris::432ca599953ff50cd4eeffe22faf3e48" - }); + PrepareDatasourceCountryAssociation + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", sourcePath, + "--outputPath", workingDir.toString() + "/datasourceCountry", + "--allowedtypes", "pubsrepository::institutional", + "--whitelist", + "10|openaire____::3795d6478e30e2c9f787d427ff160944;10|opendoar____::16e6a3326dd7d868cbc926602a61e4d0;10|eurocrisdris::fe4903425d9040f680d8610d9079ea14;10|openaire____::5b76240cc27a58c6f7ceef7d8c36660e;10|openaire____::172bbccecf8fca44ab6a6653e84cb92a;10|openaire____::149c6590f8a06b46314eed77bfca693f;10|eurocrisdris::a6026877c1a174d60f81fd71f62df1c1;10|openaire____::4692342f0992d91f9e705c26959f09e0;10|openaire____::8d529dbb05ec0284662b391789e8ae2a;10|openaire____::345c9d171ef3c5d706d08041d506428c;10|opendoar____::1c1d4df596d01da60385f0bb17a4a9e0;10|opendoar____::7a614fd06c325499f1680b9896beedeb;10|opendoar____::1ee3dfcd8a0645a25a35977997223d22;10|opendoar____::d296c101daa88a51f6ca8cfc1ac79b50;10|opendoar____::798ed7d4ee7138d49b8828958048130a;10|openaire____::c9d2209ecc4d45ba7b4ca7597acb88a2;10|eurocrisdris::c49e0fe4b9ba7b7fab717d1f0f0a674d;10|eurocrisdris::9ae43d14471c4b33661fedda6f06b539;10|eurocrisdris::432ca599953ff50cd4eeffe22faf3e48" + }); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD tmp = sc - .textFile(workingDir.toString() + "/datasourceCountry") - .map(item -> OBJECT_MAPPER.readValue(item, DatasourceCountry.class)); + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/datasourceCountry") + .map(item -> OBJECT_MAPPER.readValue(item, DatasourceCountry.class)); - Assertions.assertEquals(3, tmp.count()); - Assertions.assertEquals(1, tmp.filter(dsc -> dsc.getDataSourceId() - .equals("10|eurocrisdris::fe4903425d9040f680d8610d9079ea14")).count()); - Assertions.assertEquals(1, tmp.filter(dsc -> dsc.getDataSourceId() - .equals("10|opendoar____::f0dd4a99fba6075a9494772b58f95280")).count()); - Assertions.assertEquals(1, tmp.filter(dsc -> dsc.getDataSourceId() - .equals("10|eurocrisdris::9ae43d14471c4b33661fedda6f06b539")).count()); + Assertions.assertEquals(3, tmp.count()); + Assertions + .assertEquals( + 1, tmp + .filter( + dsc -> dsc + .getDataSourceId() + .equals("10|eurocrisdris::fe4903425d9040f680d8610d9079ea14")) + .count()); + Assertions + .assertEquals( + 1, tmp + .filter( + dsc -> dsc + .getDataSourceId() + .equals("10|opendoar____::f0dd4a99fba6075a9494772b58f95280")) + .count()); + Assertions + .assertEquals( + 1, tmp + .filter( + dsc -> dsc + .getDataSourceId() + .equals("10|eurocrisdris::9ae43d14471c4b33661fedda6f06b539")) + .count()); - Assertions.assertEquals("NL", tmp.filter(dsc -> dsc.getDataSourceId() - .equals("10|eurocrisdris::fe4903425d9040f680d8610d9079ea14")).collect() - .get(0).getCountry().getClassid()); - Assertions.assertEquals("Netherlands", tmp.filter(dsc -> dsc.getDataSourceId() - .equals("10|eurocrisdris::fe4903425d9040f680d8610d9079ea14")).collect() - .get(0).getCountry().getClassname()); + Assertions + .assertEquals( + "NL", tmp + .filter( + dsc -> dsc + .getDataSourceId() + .equals("10|eurocrisdris::fe4903425d9040f680d8610d9079ea14")) + .collect() + .get(0) + .getCountry() + .getClassid()); + Assertions + .assertEquals( + "Netherlands", tmp + .filter( + dsc -> dsc + .getDataSourceId() + .equals("10|eurocrisdris::fe4903425d9040f680d8610d9079ea14")) + .collect() + .get(0) + .getCountry() + .getClassname()); - Assertions.assertEquals("IT", tmp.filter(dsc -> dsc.getDataSourceId() - .equals("10|opendoar____::f0dd4a99fba6075a9494772b58f95280")).collect() - .get(0).getCountry().getClassid()); - Assertions.assertEquals("Italy", tmp.filter(dsc -> dsc.getDataSourceId() - .equals("10|opendoar____::f0dd4a99fba6075a9494772b58f95280")).collect() - .get(0).getCountry().getClassname()); + Assertions + .assertEquals( + "IT", tmp + .filter( + dsc -> dsc + .getDataSourceId() + .equals("10|opendoar____::f0dd4a99fba6075a9494772b58f95280")) + .collect() + .get(0) + .getCountry() + .getClassid()); + Assertions + .assertEquals( + "Italy", tmp + .filter( + dsc -> dsc + .getDataSourceId() + .equals("10|opendoar____::f0dd4a99fba6075a9494772b58f95280")) + .collect() + .get(0) + .getCountry() + .getClassname()); - Assertions.assertEquals("FR", tmp.filter(dsc -> dsc.getDataSourceId() - .equals("10|eurocrisdris::9ae43d14471c4b33661fedda6f06b539")).collect() - .get(0).getCountry().getClassid()); - Assertions.assertEquals("France", tmp.filter(dsc -> dsc.getDataSourceId() - .equals("10|eurocrisdris::9ae43d14471c4b33661fedda6f06b539")).collect() - .get(0).getCountry().getClassname()); + Assertions + .assertEquals( + "FR", tmp + .filter( + dsc -> dsc + .getDataSourceId() + .equals("10|eurocrisdris::9ae43d14471c4b33661fedda6f06b539")) + .collect() + .get(0) + .getCountry() + .getClassid()); + Assertions + .assertEquals( + "France", tmp + .filter( + dsc -> dsc + .getDataSourceId() + .equals("10|eurocrisdris::9ae43d14471c4b33661fedda6f06b539")) + .collect() + .get(0) + .getCountry() + .getClassname()); - tmp.foreach(e -> System.out.println(OBJECT_MAPPER.writeValueAsString(e))); + tmp.foreach(e -> System.out.println(OBJECT_MAPPER.writeValueAsString(e))); - } + } } diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/countrypropagation/ResultCountryPreparationTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/countrypropagation/ResultCountryPreparationTest.java index 37cc4f342..797d1c979 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/countrypropagation/ResultCountryPreparationTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/countrypropagation/ResultCountryPreparationTest.java @@ -1,8 +1,12 @@ + package eu.dnetlib.dhp.countrypropagation; -import com.fasterxml.jackson.databind.ObjectMapper; +import static eu.dnetlib.dhp.PropagationConstant.isSparkSessionManaged; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; -import eu.dnetlib.dhp.schema.oaf.Publication; import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -13,103 +17,142 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; +import com.fasterxml.jackson.databind.ObjectMapper; -import static eu.dnetlib.dhp.PropagationConstant.isSparkSessionManaged; +import eu.dnetlib.dhp.schema.oaf.Publication; public class ResultCountryPreparationTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static SparkSession spark; + private static SparkSession spark; - private static Path workingDir; + private static Path workingDir; - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files.createTempDirectory(ResultCountryPreparationTest.class.getSimpleName()); + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(ResultCountryPreparationTest.class.getSimpleName()); - SparkConf conf = new SparkConf(); - conf.setAppName(ResultCountryPreparationTest.class.getSimpleName()); + SparkConf conf = new SparkConf(); + conf.setAppName(ResultCountryPreparationTest.class.getSimpleName()); - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.toString()); - conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); - spark = SparkSession - .builder() - .appName(ResultCountryPreparationTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } + spark = SparkSession + .builder() + .appName(ResultCountryPreparationTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - spark.stop(); - } + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } - @Test - void testPrepareResultCountry() throws Exception { - final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/countrypropagation/graph/publication") - .getPath(); + @Test + void testPrepareResultCountry() throws Exception { + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/countrypropagation/graph/publication") + .getPath(); - final String preparedInfoPath = getClass() - .getResource("/eu/dnetlib/dhp/countrypropagation/datasourcecountry") - .getPath(); + final String preparedInfoPath = getClass() + .getResource("/eu/dnetlib/dhp/countrypropagation/datasourcecountry") + .getPath(); + PrepareResultCountrySet + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--workingPath", workingDir.toString() + "/working", + "--sourcePath", sourcePath, + "--outputPath", workingDir.toString() + "/resultCountry", + "--preparedInfoPath", preparedInfoPath, + "--resultTableName", Publication.class.getCanonicalName() + }); - PrepareResultCountrySet - .main( - new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--workingPath", workingDir.toString() + "/working", - "--sourcePath", sourcePath, - "--outputPath", workingDir.toString() + "/resultCountry", - "--preparedInfoPath", preparedInfoPath, - "--resultTableName", Publication.class.getCanonicalName() - }); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/resultCountry") + .map(item -> OBJECT_MAPPER.readValue(item, ResultCountrySet.class)); - JavaRDD tmp = sc - .textFile(workingDir.toString() + "/resultCountry") - .map(item -> OBJECT_MAPPER.readValue(item, ResultCountrySet.class)); + Assertions.assertEquals(5, tmp.count()); + ResultCountrySet rc = tmp + .filter(r -> r.getResultId().equals("50|06cdd3ff4700::49ec404cee4e1452808aabeaffbd3072")) + .collect() + .get(0); + Assertions.assertEquals(1, rc.getCountrySet().size()); + Assertions.assertEquals("NL", rc.getCountrySet().get(0).getClassid()); + Assertions.assertEquals("Netherlands", rc.getCountrySet().get(0).getClassname()); - Assertions.assertEquals(5, tmp.count()); + rc = tmp + .filter(r -> r.getResultId().equals("50|07b5c0ccd4fe::e7f5459cc97865f2af6e3da964c1250b")) + .collect() + .get(0); + Assertions.assertEquals(1, rc.getCountrySet().size()); + Assertions.assertEquals("NL", rc.getCountrySet().get(0).getClassid()); + Assertions.assertEquals("Netherlands", rc.getCountrySet().get(0).getClassname()); - ResultCountrySet rc = tmp.filter(r -> r.getResultId().equals("50|06cdd3ff4700::49ec404cee4e1452808aabeaffbd3072")).collect().get(0); - Assertions.assertEquals(1, rc.getCountrySet().size()); - Assertions.assertEquals("NL",rc.getCountrySet().get(0).getClassid()); - Assertions.assertEquals("Netherlands",rc.getCountrySet().get(0).getClassname()); + rc = tmp + .filter(r -> r.getResultId().equals("50|355e65625b88::e7d48a470b13bda61f7ebe3513e20cb6")) + .collect() + .get(0); + Assertions.assertEquals(2, rc.getCountrySet().size()); + Assertions + .assertTrue( + rc + .getCountrySet() + .stream() + .anyMatch(cs -> cs.getClassid().equals("IT") && cs.getClassname().equals("Italy"))); + Assertions + .assertTrue( + rc + .getCountrySet() + .stream() + .anyMatch(cs -> cs.getClassid().equals("FR") && cs.getClassname().equals("France"))); - rc = tmp.filter(r -> r.getResultId().equals("50|07b5c0ccd4fe::e7f5459cc97865f2af6e3da964c1250b")).collect().get(0); - Assertions.assertEquals(1, rc.getCountrySet().size()); - Assertions.assertEquals("NL",rc.getCountrySet().get(0).getClassid()); - Assertions.assertEquals("Netherlands",rc.getCountrySet().get(0).getClassname()); + rc = tmp + .filter(r -> r.getResultId().equals("50|355e65625b88::74009c567c81b4aa55c813db658734df")) + .collect() + .get(0); + Assertions.assertEquals(2, rc.getCountrySet().size()); + Assertions + .assertTrue( + rc + .getCountrySet() + .stream() + .anyMatch(cs -> cs.getClassid().equals("IT") && cs.getClassname().equals("Italy"))); + Assertions + .assertTrue( + rc + .getCountrySet() + .stream() + .anyMatch(cs -> cs.getClassid().equals("NL") && cs.getClassname().equals("Netherlands"))); - rc = tmp.filter(r -> r.getResultId().equals("50|355e65625b88::e7d48a470b13bda61f7ebe3513e20cb6")).collect().get(0); - Assertions.assertEquals(2, rc.getCountrySet().size()); - Assertions.assertTrue(rc.getCountrySet().stream().anyMatch(cs -> cs.getClassid().equals("IT") && cs.getClassname().equals("Italy"))); - Assertions.assertTrue(rc.getCountrySet().stream().anyMatch(cs -> cs.getClassid().equals("FR") && cs.getClassname().equals("France"))); + rc = tmp + .filter(r -> r.getResultId().equals("50|355e65625b88::54a1c76f520bb2c8da27d12e42891088")) + .collect() + .get(0); + Assertions.assertEquals(2, rc.getCountrySet().size()); + Assertions + .assertTrue( + rc + .getCountrySet() + .stream() + .anyMatch(cs -> cs.getClassid().equals("IT") && cs.getClassname().equals("Italy"))); + Assertions + .assertTrue( + rc + .getCountrySet() + .stream() + .anyMatch(cs -> cs.getClassid().equals("FR") && cs.getClassname().equals("France"))); - rc = tmp.filter(r -> r.getResultId().equals("50|355e65625b88::74009c567c81b4aa55c813db658734df")).collect().get(0); - Assertions.assertEquals(2, rc.getCountrySet().size()); - Assertions.assertTrue(rc.getCountrySet().stream().anyMatch(cs -> cs.getClassid().equals("IT") && cs.getClassname().equals("Italy"))); - Assertions.assertTrue(rc.getCountrySet().stream().anyMatch(cs -> cs.getClassid().equals("NL") && cs.getClassname().equals("Netherlands"))); - - - rc = tmp.filter(r -> r.getResultId().equals("50|355e65625b88::54a1c76f520bb2c8da27d12e42891088")).collect().get(0); - Assertions.assertEquals(2, rc.getCountrySet().size()); - Assertions.assertTrue(rc.getCountrySet().stream().anyMatch(cs -> cs.getClassid().equals("IT") && cs.getClassname().equals("Italy"))); - Assertions.assertTrue(rc.getCountrySet().stream().anyMatch(cs -> cs.getClassid().equals("FR") && cs.getClassname().equals("France"))); - - - } + } }