diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/SparkEoscBulkTag.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/SparkEoscBulkTag.java index 444f3fb25b..010ad5a875 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/SparkEoscBulkTag.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/eosc/SparkEoscBulkTag.java @@ -97,25 +97,29 @@ public class SparkEoscBulkTag implements Serializable { }); } - private static void selectCompliantDatasources(SparkSession spark, String inputPath, String workingPath, String datasourceMapPath) { + private static void selectCompliantDatasources(SparkSession spark, String inputPath, String workingPath, + String datasourceMapPath) { Dataset datasources = readPath(spark, inputPath + "datasource", Datasource.class) - .filter((FilterFunction) ds -> { - final String compatibility = ds.getOpenairecompatibility().getClassid(); - return compatibility.equalsIgnoreCase(OPENAIRE_3) || - compatibility.equalsIgnoreCase(OPENAIRE_4) || - compatibility.equalsIgnoreCase(OPENAIRE_CRIS) || - compatibility.equalsIgnoreCase(OPENAIRE_DATA); - }); + .filter((FilterFunction) ds -> { + final String compatibility = ds.getOpenairecompatibility().getClassid(); + return compatibility.equalsIgnoreCase(OPENAIRE_3) || + compatibility.equalsIgnoreCase(OPENAIRE_4) || + compatibility.equalsIgnoreCase(OPENAIRE_CRIS) || + compatibility.equalsIgnoreCase(OPENAIRE_DATA); + }); Dataset datasourceMaster = readPath(spark, datasourceMapPath, DatasourceMaster.class); - datasources.joinWith(datasourceMaster, datasources.col("id").equalTo(datasourceMaster.col("master")), "left") - .map((MapFunction, DatasourceMaster>) t2 -> t2._2(), Encoders.bean(DatasourceMaster.class) ) - .filter(Objects::nonNull) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingPath + "datasource"); + datasources + .joinWith(datasourceMaster, datasources.col("id").equalTo(datasourceMaster.col("master")), "left") + .map( + (MapFunction, DatasourceMaster>) t2 -> t2._2(), + Encoders.bean(DatasourceMaster.class)) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath + "datasource"); } private static void execBulkTag( @@ -130,7 +134,7 @@ public class SparkEoscBulkTag implements Serializable { .collectAsList(); readPath(spark, inputPath + resultType, resultClazz) - .map( + .map( (MapFunction) value -> enrich(value, hostedByList), Encoders.bean(resultClazz)) .write() diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/EOSCContextTaggingTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/EOSCContextTaggingTest.java index e304356076..b8d9e3335e 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/EOSCContextTaggingTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/EOSCContextTaggingTest.java @@ -6,8 +6,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; -import eu.dnetlib.dhp.bulktag.eosc.DatasourceMaster; -import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -29,7 +27,9 @@ import org.slf4j.LoggerFactory; */ import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.bulktag.eosc.DatasourceMaster; import eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag; +import eu.dnetlib.dhp.schema.oaf.*; //"50|475c1990cbb2::0fecfb874d9395aa69d2f4d7cd1acbea" has instance hostedby eosc (cris) //"50|475c1990cbb2::3185cd5d8a2b0a06bb9b23ef11748eb1" has instance hostedby eosc (zenodo) @@ -74,103 +74,105 @@ public class EOSCContextTaggingTest { } @Test - void EoscContextTagTest() throws Exception{ + void EoscContextTagTest() throws Exception { spark - .read() - .textFile(getClass().getResource("/eu/dnetlib/dhp/bulktag/eosc/datasource/datasource_1").getPath()) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, Datasource.class), - Encoders.bean(Datasource.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingDir.toString() + "/input/datasource"); + .read() + .textFile(getClass().getResource("/eu/dnetlib/dhp/bulktag/eosc/datasource/datasource_1").getPath()) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, Datasource.class), + Encoders.bean(Datasource.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir.toString() + "/input/datasource"); spark - .read() - .textFile(getClass().getResource("/eu/dnetlib/dhp/bulktag/eosc/dataset/dataset_10.json").getPath()) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, Dataset.class), - Encoders.bean(Dataset.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingDir.toString() + "/input/dataset"); + .read() + .textFile(getClass().getResource("/eu/dnetlib/dhp/bulktag/eosc/dataset/dataset_10.json").getPath()) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, Dataset.class), + Encoders.bean(Dataset.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir.toString() + "/input/dataset"); SparkEoscBulkTag - .main( - new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-sourcePath", - workingDir.toString() + "/input/", - "-workingPath", workingDir.toString() + "/working/", - "-datasourceMapPath", - getClass() - .getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster") - .getPath(), - "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset", - "-resultType", "dataset" - }); + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", + workingDir.toString() + "/input/", + "-workingPath", workingDir.toString() + "/working/", + "-datasourceMapPath", + getClass() + .getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster") + .getPath(), + "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset", + "-resultType", "dataset" + }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - Assertions.assertEquals(2, sc - .textFile(workingDir.toString() + "/working/datasource") - .map(item -> OBJECT_MAPPER.readValue(item, DatasourceMaster.class)).count()); - + Assertions + .assertEquals( + 2, sc + .textFile(workingDir.toString() + "/working/datasource") + .map(item -> OBJECT_MAPPER.readValue(item, DatasourceMaster.class)) + .count()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/input/dataset") - .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); + .textFile(workingDir.toString() + "/input/dataset") + .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); Assertions.assertEquals(10, tmp.count()); Assertions - .assertEquals( - 2, - tmp - .filter( - s -> s.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) - .count()); + .assertEquals( + 2, + tmp + .filter( + s -> s.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) + .count()); Assertions - .assertEquals( - 1, - tmp - .filter( - d -> d.getId().equals("50|475c1990cbb2::0fecfb874d9395aa69d2f4d7cd1acbea") - && - d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) - .count()); + .assertEquals( + 1, + tmp + .filter( + d -> d.getId().equals("50|475c1990cbb2::0fecfb874d9395aa69d2f4d7cd1acbea") + && + d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) + .count()); Assertions - .assertEquals( - 1, - tmp - .filter( - d -> d.getId().equals("50|475c1990cbb2::3185cd5d8a2b0a06bb9b23ef11748eb1") - && - d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) - .count()); + .assertEquals( + 1, + tmp + .filter( + d -> d.getId().equals("50|475c1990cbb2::3185cd5d8a2b0a06bb9b23ef11748eb1") + && + d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) + .count()); Assertions - .assertEquals( - 0, - tmp - .filter( - d -> d.getId().equals("50|475c1990cbb2::449f28eefccf9f70c04ad70d61e041c7") - && - d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) - .count()); + .assertEquals( + 0, + tmp + .filter( + d -> d.getId().equals("50|475c1990cbb2::449f28eefccf9f70c04ad70d61e041c7") + && + d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) + .count()); Assertions - .assertEquals( - 0, - tmp - .filter( - d -> d.getId().equals("50|475c1990cbb2::3894c94123e96df8a21249957cf160cb") - && - d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) - .count()); + .assertEquals( + 0, + tmp + .filter( + d -> d.getId().equals("50|475c1990cbb2::3894c94123e96df8a21249957cf160cb") + && + d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) + .count()); } @Test @@ -187,17 +189,16 @@ public class EOSCContextTaggingTest { .option("compression", "gzip") .json(workingDir.toString() + "/input/dataset"); - spark - .read() - .textFile(getClass().getResource("/eu/dnetlib/dhp/bulktag/eosc/datasource/datasource").getPath()) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, Datasource.class), - Encoders.bean(Datasource.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingDir.toString() + "/input/datasource"); + .read() + .textFile(getClass().getResource("/eu/dnetlib/dhp/bulktag/eosc/datasource/datasource").getPath()) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, Datasource.class), + Encoders.bean(Datasource.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingDir.toString() + "/input/datasource"); SparkEoscBulkTag .main( @@ -211,13 +212,11 @@ public class EOSCContextTaggingTest { .getResource("/eu/dnetlib/dhp/bulktag/eosc/datasourceMasterAssociation/datasourceMaster") .getPath(), "-resultTableName", "eu.dnetlib.dhp.schema.oaf.Dataset", - "-resultType", "dataset" + "-resultType", "dataset" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - - JavaRDD tmp = sc .textFile(workingDir.toString() + "/input/dataset") .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); @@ -232,7 +231,6 @@ public class EOSCContextTaggingTest { s -> s.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) .count()); - } }