diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/SparkEoscBulkTag.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/SparkEoscBulkTag.java index 600a5cec8..3ed910184 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/SparkEoscBulkTag.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/SparkEoscBulkTag.java @@ -93,13 +93,9 @@ public class SparkEoscBulkTag implements Serializable { String datasourceMapPath, Class resultClazz) { - List hostedByList = readPath(spark, datasourceMapPath, DatasourceMaster.class) .map((MapFunction) dm -> dm.getMaster(), Encoders.STRING()) - .collectAsList(); - - - + .collectAsList(); readPath(spark, inputPath, resultClazz) .map(patchResult(), Encoders.bean(resultClazz)) diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/EOSCContextTaggingTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/EOSCContextTaggingTest.java index d6785acc7..cbdab7628 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/EOSCContextTaggingTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/EOSCContextTaggingTest.java @@ -1,16 +1,11 @@ package eu.dnetlib.dhp.bulktag; -/** - * @author miriam.baglioni - * @Date 22/07/22 - */ -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag; -import eu.dnetlib.dhp.schema.oaf.Dataset; -import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; -import eu.dnetlib.dhp.schema.oaf.Software; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -26,10 +21,17 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.List; +/** + * @author miriam.baglioni + * @Date 22/07/22 + */ +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag; +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; +import eu.dnetlib.dhp.schema.oaf.Software; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; //"50|475c1990cbb2::0fecfb874d9395aa69d2f4d7cd1acbea" has instance hostedby eosc //"50|475c1990cbb2::3185cd5d8a2b0a06bb9b23ef11748eb1" has instance hostedby eosc @@ -37,116 +39,124 @@ import java.util.List; //"50|475c1990cbb2::3894c94123e96df8a21249957cf160cb" has EoscTag public class EOSCContextTaggingTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static SparkSession spark; + private static SparkSession spark; - private static Path workingDir; + private static Path workingDir; - private static final Logger log = LoggerFactory.getLogger(EOSCContextTaggingTest.class); + private static final Logger log = LoggerFactory.getLogger(EOSCContextTaggingTest.class); - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files.createTempDirectory(EOSCContextTaggingTest.class.getSimpleName()); - log.info("using work dir {}", workingDir); + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(EOSCContextTaggingTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); - SparkConf conf = new SparkConf(); - conf.setAppName(EOSCContextTaggingTest.class.getSimpleName()); + SparkConf conf = new SparkConf(); + conf.setAppName(EOSCContextTaggingTest.class.getSimpleName()); - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.toString()); - conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); - spark = SparkSession - .builder() - .appName(EOSCTagJobTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } + spark = SparkSession + .builder() + .appName(EOSCTagJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - spark.stop(); - } + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } - @Test - void EoscContextTagTest() throws Exception { + @Test + void EoscContextTagTest() throws Exception { - spark - .read() - .textFile(getClass().getResource("/eu/dnetlib/dhp/bulktag/eosc/dataset/dataset_10.json").getPath()) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, Dataset.class), - Encoders.bean(Dataset.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingDir.toString() + "/input/dataset"); + spark + .read() + .textFile(getClass().getResource("/eu/dnetlib/dhp/bulktag/eosc/dataset/dataset_10.json").getPath()) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, Dataset.class), + Encoders.bean(Dataset.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir.toString() + "/input/dataset"); + SparkEoscBulkTag + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", + workingDir.toString() + "/input/dataset", + "-workingPath", workingDir.toString() + "/working/dataset", + "-datasourceMapPath", + getClass() + .getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster") + .getPath(), + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset" + }); - SparkEoscBulkTag - .main( - new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-sourcePath", - workingDir.toString() + "/input/dataset", - "-workingPath", workingDir.toString() + "/working/dataset", - "-datasourceMapPath", getClass().getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster").getPath(), - "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset" - }); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/input/dataset") + .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); - JavaRDD tmp = sc - .textFile(workingDir.toString() + "/input/dataset") - .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); + Assertions.assertEquals(10, tmp.count()); - Assertions.assertEquals(10, tmp.count()); + Assertions + .assertEquals( + 4, + tmp + .filter( + s -> s.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) + .count()); - Assertions - .assertEquals( - 4, - tmp - .filter( - s -> s.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) - .count()); + Assertions + .assertEquals( + 1, + tmp + .filter( + d -> d.getId().equals("50|475c1990cbb2::0fecfb874d9395aa69d2f4d7cd1acbea") + && + d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) + .count()); + Assertions + .assertEquals( + 1, + tmp + .filter( + d -> d.getId().equals("50|475c1990cbb2::3185cd5d8a2b0a06bb9b23ef11748eb1") + && + d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) + .count()); - Assertions - .assertEquals(1, - tmp - .filter(d -> d.getId().equals("50|475c1990cbb2::0fecfb874d9395aa69d2f4d7cd1acbea") - && - d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))).count() - ); - Assertions - .assertEquals(1, - tmp - .filter(d -> d.getId().equals("50|475c1990cbb2::3185cd5d8a2b0a06bb9b23ef11748eb1") - && - d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))).count() - ); - - - Assertions - .assertEquals(1, - tmp - .filter(d -> d.getId().equals("50|475c1990cbb2::3894c94123e96df8a21249957cf160cb") - && - d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))).count() - ); - - Assertions - .assertEquals(1, - tmp - .filter(d -> d.getId().equals("50|475c1990cbb2::3894c94123e96df8a21249957cf160cb") - && - d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))).count() - ); - } + Assertions + .assertEquals( + 1, + tmp + .filter( + d -> d.getId().equals("50|475c1990cbb2::3894c94123e96df8a21249957cf160cb") + && + d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) + .count()); + Assertions + .assertEquals( + 1, + tmp + .filter( + d -> d.getId().equals("50|475c1990cbb2::3894c94123e96df8a21249957cf160cb") + && + d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) + .count()); + } }