From d6d6ebeae5a998316f20faf7c3fe81c42e85e40e Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 30 Apr 2020 18:25:33 +0200 Subject: [PATCH] preparation step: creates the subset of the merges relations --- .../blacklist/PrepareMergedRelationJob.java | 91 +++++++++++++++++++ .../SparkRemoveBlacklistedRelationJob.java | 4 + .../input_preparerelation_parameters.json | 26 ++++++ .../blacklist/sparkblacklist_parameters.json | 32 +++++++ 4 files changed, 153 insertions(+) create mode 100644 dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/PrepareMergedRelationJob.java create mode 100644 dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java create mode 100644 dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/input_preparerelation_parameters.json create mode 100644 dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/sparkblacklist_parameters.json diff --git a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/PrepareMergedRelationJob.java b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/PrepareMergedRelationJob.java new file mode 100644 index 000000000..3d4d4315e --- /dev/null +++ b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/PrepareMergedRelationJob.java @@ -0,0 +1,91 @@ +package eu.dnetlib.dhp.blacklist; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Relation; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +public class PrepareMergedRelationJob { + + private static final Logger log = LoggerFactory.getLogger(PrepareMergedRelationJob.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + PrepareMergedRelationJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/blacklist/input_preparerelation_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {} " , outputPath); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + // removeOutputDir(spark, potentialUpdatePath); + // removeOutputDir(spark, alreadyLinkedPath); + selectMergesRelations( + spark, + inputPath, + outputPath); + }); + } + + private static void selectMergesRelations(SparkSession spark, String inputPath, String outputPath) { + + Dataset relation = readRelations(spark, inputPath); + relation.createOrReplaceTempView("relation"); + + spark.sql("Select * from relation where relclass = 'merges' and datainfo.deletedbyinference = false") + .as(Encoders.bean(Relation.class)) + .toJSON() + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .text(outputPath); + } + + public static org.apache.spark.sql.Dataset readRelations( + SparkSession spark, String inputPath) { + return spark + .read() + .textFile(inputPath) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, Relation.class), + Encoders.bean(Relation.class)); + } +} diff --git a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java new file mode 100644 index 000000000..348468f51 --- /dev/null +++ b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java @@ -0,0 +1,4 @@ +package eu.dnetlib.dhp.blacklist; + +public class RemoveBlacklistedRelationSparkJob { +} diff --git a/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/input_preparerelation_parameters.json b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/input_preparerelation_parameters.json new file mode 100644 index 000000000..69de24213 --- /dev/null +++ b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/input_preparerelation_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "p", + "paramLongName": "hdfsPath", + "paramDescription": "the path where storing the sequential file", + "paramRequired": true + }, + { + "paramName": "s", + "paramLongName": "sourcePath", + "paramDescription": "the path to the graph used to remove the relations ", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path where to store the temporary result ", + "paramRequired": true + }, + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "true if the spark session is managed", + "paramRequired": false + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/sparkblacklist_parameters.json b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/sparkblacklist_parameters.json new file mode 100644 index 000000000..9a2eadaa7 --- /dev/null +++ b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/sparkblacklist_parameters.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "p", + "paramLongName": "hdfsPath", + "paramDescription": "the path where storing the sequential file", + "paramRequired": true + }, + { + "paramName": "nn", + "paramLongName": "hdfsNameNode", + "paramDescription": "the name node on hdfs", + "paramRequired": true + }, + { + "paramName": "pgurl", + "paramLongName": "postgresUrl", + "paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb", + "paramRequired": true + }, + { + "paramName": "pguser", + "paramLongName": "postgresUser", + "paramDescription": "postgres user", + "paramRequired": false + }, + { + "paramName": "pgpasswd", + "paramLongName": "postgresPassword", + "paramDescription": "postgres password", + "paramRequired": false + } +] \ No newline at end of file