diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java index b4678cc6c..122e27dec 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJob.java @@ -25,11 +25,13 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.action.model.MasterDuplicate; import eu.dnetlib.dhp.oa.graph.clean.country.CleanCountrySparkJob; import eu.dnetlib.dhp.schema.oaf.Instance; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.utils.DHPUtils; import scala.Tuple2; public class CleanCfHbSparkJob { @@ -76,6 +78,8 @@ public class CleanCfHbSparkJob { conf, isSparkSessionManaged, spark -> { + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + HdfsSupport.remove(resolvedPath, spark.sparkContext().hadoopConfiguration()); cleanCfHb( spark, inputPath, entityClazz, resolvedPath, dsMasterDuplicatePath, outputPath); }); @@ -92,33 +96,15 @@ public class CleanCfHbSparkJob { // prepare the resolved CF|HB references with the corresponding EMPTY master ID Dataset resolved = spark - .read() - .textFile(inputPath) - .map(as(entityClazz), Encoders.bean(entityClazz)) - .flatMap( - (FlatMapFunction) r -> { - final List list = Stream - .concat( - r.getCollectedfrom().stream().map(KeyValue::getKey), - Stream - .concat( - r.getInstance().stream().map(Instance::getHostedby).map(KeyValue::getKey), - r.getInstance().stream().map(Instance::getCollectedfrom).map(KeyValue::getKey))) - .distinct() - .map(s -> asIdCfHbMapping(r.getId(), s)) - .collect(Collectors.toList()); - return list.iterator(); - }, - Encoders.bean(IdCfHbMapping.class)); + .read() + .textFile(inputPath) + .map(as(entityClazz), Encoders.bean(entityClazz)) + .flatMap(flattenCfHbFn(), Encoders.bean(IdCfHbMapping.class)); // set the EMPTY master ID/NAME and save it resolved .joinWith(md, resolved.col("cfhb").equalTo(md.col("duplicateId"))) - .map((MapFunction, IdCfHbMapping>) t -> { - t._1().setMasterId(t._2().getMasterId()); - t._1().setMasterName(t._2().getMasterName()); - return t._1(); - }, Encoders.bean(IdCfHbMapping.class)) + .map(asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class)) .write() .mode(SaveMode.Overwrite) .json(resolvedPath); @@ -131,27 +117,46 @@ public class CleanCfHbSparkJob { // read the result table Dataset res = spark - .read() - .textFile(inputPath) - .map(as(entityClazz), Encoders.bean(entityClazz)); + .read() + .textFile(inputPath) + .map(as(entityClazz), Encoders.bean(entityClazz)); // Join the results with the resolved CF|HB mapping, apply the mapping and save it res .joinWith(resolvedDS, res.col("id").equalTo(resolvedDS.col("resultId")), "left") .groupByKey((MapFunction, String>) t -> t._1().getId(), Encoders.STRING()) .mapGroups(getMapGroupsFunction(), Encoders.bean(entityClazz)) - //.agg(new IdCfHbMappingAggregator(entityClazz).toColumn()) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); } - @NotNull + private static MapFunction, IdCfHbMapping> asIdCfHbMapping() { + return t -> { + t._1().setMasterId(t._2().getMasterId()); + t._1().setMasterName(t._2().getMasterName()); + return t._1(); + }; + } + + private static FlatMapFunction flattenCfHbFn() { + return r -> Stream + .concat( + r.getCollectedfrom().stream().map(KeyValue::getKey), + Stream + .concat( + r.getInstance().stream().map(Instance::getHostedby).map(KeyValue::getKey), + r.getInstance().stream().map(Instance::getCollectedfrom).map(KeyValue::getKey))) + .distinct() + .map(s -> asIdCfHbMapping(r.getId(), s)) + .iterator(); + } + private static MapGroupsFunction, T> getMapGroupsFunction() { return new MapGroupsFunction, T>() { @Override - public T call(String key, Iterator> values) throws Exception { + public T call(String key, Iterator> values) { final Tuple2 first = values.next(); final T res = first._1(); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java index 680d1ff64..b0097ed6f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/cfhb/CleanCfHbSparkJobTest.java @@ -1,8 +1,16 @@ + package eu.dnetlib.dhp.oa.graph.clean.cfhb; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.schema.oaf.Dataset; -import eu.dnetlib.dhp.schema.oaf.Publication; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; @@ -13,106 +21,193 @@ import org.junit.jupiter.api.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Publication; public class CleanCfHbSparkJobTest { - private static final Logger log = LoggerFactory.getLogger(CleanCfHbSparkJobTest.class); + private static final Logger log = LoggerFactory.getLogger(CleanCfHbSparkJobTest.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static SparkSession spark; + private static SparkSession spark; - private static Path testBaseTmpPath; + private static Path testBaseTmpPath; - private static String resolvedPath; + private static String resolvedPath; - private static String graphInputPath; + private static String graphInputPath; - private static String graphOutputPath; + private static String graphOutputPath; - private static String dsMasterDuplicatePath; + private static String dsMasterDuplicatePath; - @BeforeAll - public static void beforeAll() throws IOException, URISyntaxException { + @BeforeAll + public static void beforeAll() throws IOException, URISyntaxException { - testBaseTmpPath = Files.createTempDirectory(CleanCfHbSparkJobTest.class.getSimpleName()); - log.info("using test base path {}", testBaseTmpPath); + testBaseTmpPath = Files.createTempDirectory(CleanCfHbSparkJobTest.class.getSimpleName()); + log.info("using test base path {}", testBaseTmpPath); - final File entitiesSources = Paths - .get(CleanCfHbSparkJobTest.class.getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/entities").toURI()) - .toFile(); + final File entitiesSources = Paths + .get(CleanCfHbSparkJobTest.class.getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/entities").toURI()) + .toFile(); - FileUtils - .copyDirectory( - entitiesSources, - testBaseTmpPath.resolve("input").resolve("entities").toFile()); + FileUtils + .copyDirectory( + entitiesSources, + testBaseTmpPath.resolve("input").resolve("entities").toFile()); - FileUtils - .copyFileToDirectory( - Paths - .get(CleanCfHbSparkJobTest.class.getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/masterduplicate.json").toURI()) - .toFile(), - testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toFile()); + FileUtils + .copyFileToDirectory( + Paths + .get( + CleanCfHbSparkJobTest.class + .getResource("/eu/dnetlib/dhp/oa/graph/clean/cfhb/masterduplicate.json") + .toURI()) + .toFile(), + testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toFile()); + graphInputPath = testBaseTmpPath.resolve("input").resolve("entities").toString(); + resolvedPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbResolved").toString(); + graphOutputPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbPatched").toString(); + dsMasterDuplicatePath = testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toString(); - graphInputPath = testBaseTmpPath.resolve("input").resolve("entities").toString(); - resolvedPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbResolved").toString(); - graphOutputPath = testBaseTmpPath.resolve("workingDir").resolve("cfHbPatched").toString(); - dsMasterDuplicatePath = testBaseTmpPath.resolve("workingDir").resolve("masterduplicate").toString(); + SparkConf conf = new SparkConf(); + conf.setAppName(CleanCfHbSparkJobTest.class.getSimpleName()); - SparkConf conf = new SparkConf(); - conf.setAppName(CleanCfHbSparkJobTest.class.getSimpleName()); + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("spark.ui.enabled", "false"); - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("spark.ui.enabled", "false"); + spark = SparkSession + .builder() + .appName(CleanCfHbSparkJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } - spark = SparkSession - .builder() - .appName(CleanCfHbSparkJobTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(testBaseTmpPath.toFile()); + spark.stop(); + } - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(testBaseTmpPath.toFile()); - spark.stop(); - } + @Test + void testCleanCfHbSparkJob() throws Exception { + final String outputPath = graphOutputPath + "/dataset"; + final String inputPath = graphInputPath + "/dataset"; - @Test - void testCleanCfHbSparkJob() throws Exception { - final String outputPath = graphOutputPath + "/dataset"; - CleanCfHbSparkJob - .main( - new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--inputPath", graphInputPath + "/dataset", - "--outputPath", outputPath, - "--resolvedPath", resolvedPath + "/dataset", - "--graphTableClassName", Dataset.class.getCanonicalName(), - "--datasourceMasterDuplicate", dsMasterDuplicatePath - }); + org.apache.spark.sql.Dataset records = read(spark, inputPath, Dataset.class); + Dataset d = records + .filter("id = '50|doi_________::09821844208a5cd6300b2bfb13bca1b9'") + .first(); + assertEquals("10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", d.getCollectedfrom().get(0).getKey()); + assertEquals("Bacterial Protein Interaction Database - DUP", d.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|re3data_____::4c4416659cb74c2e0e891a883a047cbc", d.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals( + "Bacterial Protein Interaction Database - DUP", d.getInstance().get(0).getCollectedfrom().getValue()); - //final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + d = records + .filter("id = '50|DansKnawCris::0dd644304b7116e8e58da3a5e3adc37a'") + .first(); + assertEquals("10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", d.getCollectedfrom().get(0).getKey()); + assertEquals("FILUR DATA - DUP", d.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|opendoar____::788b4ac1e172d8e520c2b9461c0a3d35", d.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals("FILUR DATA - DUP", d.getInstance().get(0).getCollectedfrom().getValue()); + assertEquals( + "10|re3data_____::6ffd7bc058f762912dc494cd9c175341", d.getInstance().get(0).getHostedby().getKey()); + assertEquals("depositar - DUP", d.getInstance().get(0).getHostedby().getValue()); - Assertions.assertTrue(Files.exists(Paths.get(graphOutputPath, "dataset"))); + d = records + .filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c'") + .first(); + assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getCollectedfrom().get(0).getKey()); + assertEquals("DANS (Data Archiving and Networked Services)", d.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals( + "DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getCollectedfrom().getValue()); + assertEquals( + "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getHostedby().getKey()); + assertEquals("DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getHostedby().getValue()); - final org.apache.spark.sql.Dataset d = spark - .read() - .textFile(outputPath) - .map(as(Dataset.class), Encoders.bean(Dataset.class)); - Assertions.assertEquals(3, d.count()); - - } + CleanCfHbSparkJob + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--inputPath", inputPath, + "--outputPath", outputPath, + "--resolvedPath", resolvedPath + "/dataset", + "--graphTableClassName", Dataset.class.getCanonicalName(), + "--datasourceMasterDuplicate", dsMasterDuplicatePath + }); - private static MapFunction as(Class clazz) { - return s -> OBJECT_MAPPER.readValue(s, clazz); - } + assertTrue(Files.exists(Paths.get(graphOutputPath, "dataset"))); + + records = read(spark, outputPath, Dataset.class); + + assertEquals(3, records.count()); + + d = records + .filter("id = '50|doi_________::09821844208a5cd6300b2bfb13bca1b9'") + .first(); + assertEquals("10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", d.getCollectedfrom().get(0).getKey()); + assertEquals("Bacterial Protein Interaction Database", d.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|fairsharing_::a29d1598024f9e87beab4b98411d48ce", d.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals("Bacterial Protein Interaction Database", d.getInstance().get(0).getCollectedfrom().getValue()); + + d = records + .filter("id = '50|DansKnawCris::0dd644304b7116e8e58da3a5e3adc37a'") + .first(); + assertEquals("10|re3data_____::fc1db64b3964826913b1e9eafe830490", d.getCollectedfrom().get(0).getKey()); + assertEquals("FULIR Data", d.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|re3data_____::fc1db64b3964826913b1e9eafe830490", d.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals("FULIR Data", d.getInstance().get(0).getCollectedfrom().getValue()); + assertEquals( + "10|fairsharing_::3f647cadf56541fb9513cb63ec370187", d.getInstance().get(0).getHostedby().getKey()); + assertEquals("depositar", d.getInstance().get(0).getHostedby().getValue()); + + d = records + .filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c'") + .first(); + assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getCollectedfrom().get(0).getKey()); + assertEquals("DANS (Data Archiving and Networked Services)", d.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals( + "DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getCollectedfrom().getValue()); + assertEquals( + "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getHostedby().getKey()); + assertEquals("DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getHostedby().getValue()); + + d = records + .filter("id = '50|DansKnawCris::203a27996ddc0fd1948258e5b7dec61c'") + .first(); + assertEquals("10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getCollectedfrom().get(0).getKey()); + assertEquals("DANS (Data Archiving and Networked Services)", d.getCollectedfrom().get(0).getValue()); + assertEquals( + "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getCollectedfrom().getKey()); + assertEquals( + "DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getCollectedfrom().getValue()); + assertEquals( + "10|openaire____::c6df70599aa984f16ee52b4b86d2e89f", d.getInstance().get(0).getHostedby().getKey()); + assertEquals("DANS (Data Archiving and Networked Services)", d.getInstance().get(0).getHostedby().getValue()); + } + + private org.apache.spark.sql.Dataset read(SparkSession spark, String path, Class clazz) { + return spark + .read() + .textFile(path) + .map(as(clazz), Encoders.bean(clazz)); + } + + private static MapFunction as(Class clazz) { + return s -> OBJECT_MAPPER.readValue(s, clazz); + } }