diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/PatchRelationsApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/PatchRelationsApplication.java index c2bcf69f0..5523863ff 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/PatchRelationsApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/PatchRelationsApplication.java @@ -1,10 +1,12 @@ + package eu.dnetlib.dhp.oa.graph.raw; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.oa.graph.raw.common.RelationIdMapping; -import eu.dnetlib.dhp.schema.oaf.Relation; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.FileNotFoundException; +import java.util.Objects; +import java.util.Optional; + import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -15,101 +17,111 @@ import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.oa.graph.raw.common.RelationIdMapping; +import eu.dnetlib.dhp.schema.oaf.Relation; import scala.Tuple2; -import java.io.FileNotFoundException; -import java.util.Objects; -import java.util.Optional; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - public class PatchRelationsApplication { - private static final Logger log = LoggerFactory.getLogger(PatchRelationsApplication.class); + private static final Logger log = LoggerFactory.getLogger(PatchRelationsApplication.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(final String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - Optional.ofNullable( - PatchRelationsApplication.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/patch_relations_parameters.json")) - .orElseThrow(FileNotFoundException::new) - )); - parser.parseArgument(args); + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + Optional + .ofNullable( + PatchRelationsApplication.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/patch_relations_parameters.json")) + .orElseThrow(FileNotFoundException::new))); + parser.parseArgument(args); - final Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String graphBasePath = parser.get("graphBasePath"); - log.info("graphBasePath: {}", graphBasePath); + final String graphBasePath = parser.get("graphBasePath"); + log.info("graphBasePath: {}", graphBasePath); - final String workingDir = parser.get("workingDir"); - log.info("workingDir: {}", workingDir); + final String workingDir = parser.get("workingDir"); + log.info("workingDir: {}", workingDir); - final String idMappingPath = parser.get("idMappingPath"); - log.info("idMappingPath: {}", idMappingPath); + final String idMappingPath = parser.get("idMappingPath"); + log.info("idMappingPath: {}", idMappingPath); - final SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> patchRelations(spark, graphBasePath, workingDir, idMappingPath)); - } + final SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> patchRelations(spark, graphBasePath, workingDir, idMappingPath)); + } - /** - * Substitutes the identifiers (source/target) from the set of relations part of the graphBasePath included in the - * mapping provided by the dataset stored on idMappingPath, using workingDir as intermediate storage location. - * - * @param spark the SparkSession - * @param graphBasePath base graph path providing the set of relations to patch - * @param workingDir intermediate storage location - * @param idMappingPath dataset providing the old -> new identifier mapping - */ - private static void patchRelations(final SparkSession spark, final String graphBasePath, final String workingDir, final String idMappingPath) { + /** + * Substitutes the identifiers (source/target) from the set of relations part of the graphBasePath included in the + * mapping provided by the dataset stored on idMappingPath, using workingDir as intermediate storage location. + * + * @param spark the SparkSession + * @param graphBasePath base graph path providing the set of relations to patch + * @param workingDir intermediate storage location + * @param idMappingPath dataset providing the old -> new identifier mapping + */ + private static void patchRelations(final SparkSession spark, final String graphBasePath, final String workingDir, + final String idMappingPath) { - final String relationPath = graphBasePath + "/relation"; + final String relationPath = graphBasePath + "/relation"; - final Dataset rels = Utils.readPath(spark, relationPath, Relation.class); - final Dataset idMapping = Utils.readPath(spark, idMappingPath, RelationIdMapping.class); + final Dataset rels = Utils.readPath(spark, relationPath, Relation.class); + final Dataset idMapping = Utils.readPath(spark, idMappingPath, RelationIdMapping.class); - rels - .joinWith(idMapping, rels.col("source").equalTo(idMapping.col("oldId")), "left") - .map((MapFunction, Relation>) t -> { - final Relation r = t._1(); - Optional.ofNullable(t._2()) - .map(RelationIdMapping::getNewId) - .ifPresent(r::setSource); - return r; - }, Encoders.bean(Relation.class)) - .joinWith(idMapping, rels.col("target").equalTo(idMapping.col("oldId")), "left") - .map((MapFunction, Relation>) t -> { - final Relation r = t._1(); - Optional.ofNullable(t._2()) - .map(RelationIdMapping::getNewId) - .ifPresent(r::setTarget); - return r; - }, Encoders.bean(Relation.class)) - .map( - (MapFunction) OBJECT_MAPPER::writeValueAsString, - Encoders.STRING()) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .text(workingDir); + log.info("relations: {}", rels.count()); + log.info("idMapping: {}", idMapping.count()); - spark.read().textFile(workingDir) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .text(relationPath); - } + final Dataset bySource = rels + .joinWith(idMapping, rels.col("source").equalTo(idMapping.col("oldId")), "left") + .map((MapFunction, Relation>) t -> { + final Relation r = t._1(); + Optional + .ofNullable(t._2()) + .map(RelationIdMapping::getNewId) + .ifPresent(r::setSource); + return r; + }, Encoders.bean(Relation.class)); + bySource + .joinWith(idMapping, bySource.col("target").equalTo(idMapping.col("oldId")), "left") + .map((MapFunction, Relation>) t -> { + final Relation r = t._1(); + Optional + .ofNullable(t._2()) + .map(RelationIdMapping::getNewId) + .ifPresent(r::setTarget); + return r; + }, Encoders.bean(Relation.class)) + .map( + (MapFunction) OBJECT_MAPPER::writeValueAsString, + Encoders.STRING()) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .text(workingDir); + + spark + .read() + .textFile(workingDir) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .text(relationPath); + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/RelationIdMapping.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/RelationIdMapping.java index f251da8c3..d5852ab70 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/RelationIdMapping.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/RelationIdMapping.java @@ -1,24 +1,25 @@ + package eu.dnetlib.dhp.oa.graph.raw.common; public class RelationIdMapping { - private String oldId; + private String oldId; - private String newId; + private String newId; - public String getOldId() { - return oldId; - } + public String getOldId() { + return oldId; + } - public void setOldId(final String oldId) { - this.oldId = oldId; - } + public void setOldId(final String oldId) { + this.oldId = oldId; + } - public String getNewId() { - return newId; - } + public String getNewId() { + return newId; + } - public void setNewId(final String newId) { - this.newId = newId; - } + public void setNewId(final String newId) { + this.newId = newId; + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/PatchRelationApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/PatchRelationApplicationTest.java new file mode 100644 index 000000000..3fd365416 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/PatchRelationApplicationTest.java @@ -0,0 +1,115 @@ + +package eu.dnetlib.dhp.oa.graph.raw; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class PatchRelationApplicationTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static final String ID_MAPPING_PATH = "map/id_mapping.json"; + + private static SparkSession spark; + + private static Path workingDir; + + private static final Logger log = LoggerFactory.getLogger(PatchRelationApplicationTest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory(PatchRelationApplicationTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(PatchRelationApplicationTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(PatchRelationApplicationTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + + FileUtils + .copyInputStreamToFile( + PatchRelationApplicationTest.class.getResourceAsStream("id_mapping.json"), + workingDir.resolve(ID_MAPPING_PATH).toFile()); + + FileUtils + .copyInputStreamToFile( + PatchRelationApplicationTest.class.getResourceAsStream("relations_to_patch.json"), + workingDir.resolve("graphBasePath/relation/rels.json").toFile()); + + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void testPatchRelationApplication() throws Exception { + + final String graphBasePath = workingDir.toString() + "/graphBasePath"; + PatchRelationsApplication.main(new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-graphBasePath", graphBasePath, + "-workingDir", workingDir.toString() + "/workingDir", + "-idMappingPath", workingDir.toString() + "/" + ID_MAPPING_PATH + }); + + final List rels = spark + .read() + .textFile(graphBasePath + "/relation") + .map( + (MapFunction) s -> OBJECT_MAPPER.readValue(s, Relation.class), + Encoders.bean(Relation.class)) + .collectAsList(); + + assertEquals(6, rels.size()); + + assertEquals(0, getCount(rels, "1a"), "should be patched to 1b"); + assertEquals(0, getCount(rels, "2a"), "should be patched to 2b"); + + assertEquals(2, getCount(rels, "10a"), "not included in patching"); + assertEquals(2, getCount(rels, "20a"), "not included in patching"); + + assertEquals(2, getCount(rels, "15a"), "not included in patching"); + assertEquals(2, getCount(rels, "25a"), "not included in patching"); + + assertEquals(2, getCount(rels, "1b"), "patched from 1a"); + assertEquals(2, getCount(rels, "2b"), "patched from 2a"); + } + + private long getCount(List rels, final String id) { + return rels.stream().filter(r -> r.getSource().equals(id) || r.getTarget().equals(id)).count(); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/id_mapping.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/id_mapping.json new file mode 100644 index 000000000..640d042b1 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/id_mapping.json @@ -0,0 +1,5 @@ +{"oldId": "1a", "newId": "1b"} +{"oldId": "2a", "newId": "2b"} +{"oldId": "3a", "newId": "3b"} +{"oldId": "4a", "newId": "4b"} +{"oldId": "5a", "newId": "5b"} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/relations_to_patch.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/relations_to_patch.json new file mode 100644 index 000000000..31755c53d --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/relations_to_patch.json @@ -0,0 +1,6 @@ +{"source":"1a","target":"10a","collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":1626336932282,"relType":"datasourceOrganization","subRelType":"provision","relClass":"provides","validated":false,"validationDate":null,"properties":[]} +{"source":"10a","target":"1a","collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":1626336932282,"relType":"datasourceOrganization","subRelType":"provision","relClass":"provides","validated":false,"validationDate":null,"properties":[]} +{"source":"2a","target":"20a","collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":1626336932282,"relType":"datasourceOrganization","subRelType":"provision","relClass":"provides","validated":false,"validationDate":null,"properties":[]} +{"source":"20a","target":"2a","collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":1626336932282,"relType":"datasourceOrganization","subRelType":"provision","relClass":"provides","validated":false,"validationDate":null,"properties":[]} +{"source":"15a","target":"25a","collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":1626336932282,"relType":"datasourceOrganization","subRelType":"provision","relClass":"provides","validated":false,"validationDate":null,"properties":[]} +{"source":"25a","target":"15a","collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":1626336932282,"relType":"datasourceOrganization","subRelType":"provision","relClass":"provides","validated":false,"validationDate":null,"properties":[]} \ No newline at end of file