diff --git a/dhp-workflows/dhp-blacklist/pom.xml b/dhp-workflows/dhp-blacklist/pom.xml index 6854ea98e..497516d85 100644 --- a/dhp-workflows/dhp-blacklist/pom.xml +++ b/dhp-workflows/dhp-blacklist/pom.xml @@ -11,6 +11,7 @@ dhp-blacklist + eu.dnetlib.dhp dhp-graph-mapper diff --git a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/PrepareMergedRelationJob.java b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/PrepareMergedRelationJob.java index 3d4d4315e..296c49a42 100644 --- a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/PrepareMergedRelationJob.java +++ b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/PrepareMergedRelationJob.java @@ -1,9 +1,10 @@ + package eu.dnetlib.dhp.blacklist; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.gson.Gson; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.Relation; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +import java.util.Optional; + import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -14,78 +15,78 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; +import com.fasterxml.jackson.databind.ObjectMapper; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Relation; public class PrepareMergedRelationJob { - private static final Logger log = LoggerFactory.getLogger(PrepareMergedRelationJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final Logger log = LoggerFactory.getLogger(PrepareMergedRelationJob.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - PrepareMergedRelationJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/blacklist/input_preparerelation_parameters.json")); + String jsonConfiguration = IOUtils + .toString( + PrepareMergedRelationJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/blacklist/input_preparerelation_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); - String outputPath = parser.get("outputPath"); - log.info("outputPath: {} " , outputPath); + String outputPath = parser.get("outputPath"); + log.info("outputPath: {} ", outputPath); - SparkConf conf = new SparkConf(); - conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - runWithSparkHiveSession( - conf, - isSparkSessionManaged, - spark -> { - // removeOutputDir(spark, potentialUpdatePath); - // removeOutputDir(spark, alreadyLinkedPath); - selectMergesRelations( - spark, - inputPath, - outputPath); - }); - } + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + selectMergesRelations( + spark, + inputPath, + outputPath); + }); + } - private static void selectMergesRelations(SparkSession spark, String inputPath, String outputPath) { + private static void selectMergesRelations(SparkSession spark, String inputPath, String outputPath) { - Dataset relation = readRelations(spark, inputPath); - relation.createOrReplaceTempView("relation"); + Dataset relation = readRelations(spark, inputPath); + relation.createOrReplaceTempView("relation"); - spark.sql("Select * from relation where relclass = 'merges' and datainfo.deletedbyinference = false") - .as(Encoders.bean(Relation.class)) - .toJSON() - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .text(outputPath); - } + spark + .sql("Select * from relation " + + "where relclass = 'merges' " + + "and datainfo.deletedbyinference = false") + .as(Encoders.bean(Relation.class)) + .toJSON() + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .text(outputPath); + } - public static org.apache.spark.sql.Dataset readRelations( - SparkSession spark, String inputPath) { - return spark - .read() - .textFile(inputPath) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, Relation.class), - Encoders.bean(Relation.class)); - } + public static org.apache.spark.sql.Dataset readRelations( + SparkSession spark, String inputPath) { + return spark + .read() + .textFile(inputPath) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, Relation.class), + Encoders.bean(Relation.class)); + } } diff --git a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java index 2dacb542b..86dd0fbd3 100644 --- a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java +++ b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java @@ -55,19 +55,20 @@ public class SparkRemoveBlacklistedRelationJob { log.info("mergesPath {}: ", mergesPath); SparkConf conf = new SparkConf(); - conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - removeBlacklistedRelations( - spark, - inputPath, - blacklistPath, - outputPath, - mergesPath); - }); + conf, + isSparkSessionManaged, + spark -> { + removeBlacklistedRelations( + spark, + inputPath, + blacklistPath, + outputPath, + mergesPath); + }); + + } private static void removeBlacklistedRelations(SparkSession spark, String blacklistPath, String inputPath, @@ -78,7 +79,6 @@ public class SparkRemoveBlacklistedRelationJob { Dataset dedupSource = blackListed .joinWith(mergesRelation, blackListed.col("source").equalTo(mergesRelation.col("target")), "left_outer") - // .joinWith(inputRelation,blackListed.col("target").equalTo(inputRelation.col("target")),"left_outer") .map(c -> { Optional merged = Optional.ofNullable(c._2()); Relation bl = c._1(); diff --git a/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml index 48351e3fb..78bac5eaf 100644 --- a/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml @@ -63,7 +63,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --sourcePath${sourcePath} - --outputPath${workingDir}/relation + --outputPath${workingDir}/mergesRelation --hive_metastore_uris${hive_metastore_uris} @@ -89,6 +89,7 @@ --sourcePath${sourcePath} --outputPath${workingDir}/relation --hdfsPath${workingDir}/blacklist + --mergesPath${workingDir}/mergesRelation diff --git a/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/sparkblacklist_parameters.json b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/sparkblacklist_parameters.json index 69de24213..91a87b8b5 100644 --- a/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/sparkblacklist_parameters.json +++ b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/sparkblacklist_parameters.json @@ -22,5 +22,12 @@ "paramLongName": "isSparkSessionManaged", "paramDescription": "true if the spark session is managed", "paramRequired": false + }, + { + "paramName": "m", + "paramLongName": "mergesPath", + "paramDescription": "true if the spark session is managed", + "paramRequired": true + } ] \ No newline at end of file