From f95d28868139995fd294023a45b90c3aef4a7081 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 7 May 2020 18:22:32 +0200 Subject: [PATCH] fixed swithch of parameters --- .../SparkRemoveBlacklistedRelationJob.java | 45 +++++++++++-------- .../dhp/blacklist/oozie_app/workflow.xml | 4 +- 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java index d25272263..fe6002710 100644 --- a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java +++ b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java @@ -62,8 +62,8 @@ public class SparkRemoveBlacklistedRelationJob { spark -> { removeBlacklistedRelations( spark, - inputPath, blacklistPath, + inputPath, outputPath, mergesPath); }); @@ -76,30 +76,34 @@ public class SparkRemoveBlacklistedRelationJob { Dataset inputRelation = readRelations(spark, inputPath); Dataset mergesRelation = readRelations(spark, mergesPath); + log.info("InputRelationCount: {}", inputRelation.count()); + Dataset dedupSource = blackListed .joinWith(mergesRelation, blackListed.col("source").equalTo(mergesRelation.col("target")), "left_outer") .map(c -> { - Optional merged = Optional.ofNullable(c._2()); - Relation bl = c._1(); - if (merged.isPresent()) { - bl.setSource(merged.get().getSource()); - } - return bl; + Optional + .ofNullable(c._2()) + .ifPresent(mr -> c._1().setSource(mr.getSource())); + return c._1(); }, Encoders.bean(Relation.class)); Dataset dedupBL = dedupSource .joinWith(mergesRelation, dedupSource.col("target").equalTo(mergesRelation.col("target")), "left_outer") .map(c -> { - Optional merged = Optional.ofNullable(c._2()); - Relation bl = c._1(); - if (merged.isPresent()) { - bl.setTarget(merged.get().getSource()); - } - return bl; + Optional + .ofNullable(c._2()) + .ifPresent(mr -> c._1().setTarget(mr.getSource())); + return c._1(); }, Encoders.bean(Relation.class)); - inputRelation - .joinWith(dedupBL, inputRelation.col("source").equalTo(dedupBL.col("source")), "left_outer") + dedupBL + .write() + .json(blacklistPath + "/deduped"); + + Dataset tmp = inputRelation + .joinWith( + dedupBL, inputRelation.col("source").equalTo(dedupBL.col("source")), + "left_outer") .map(c -> { Relation ir = c._1(); Optional obl = Optional.ofNullable(c._2()); @@ -111,12 +115,15 @@ public class SparkRemoveBlacklistedRelationJob { return ir; }, Encoders.bean(Relation.class)) - .filter(r -> !(r == null)) - .toJSON() + .filter(r -> r != null); + + log.info("NumberOfRelationAfterBlacklisting: {} ", tmp.count()); + + tmp .write() .mode(SaveMode.Overwrite) - .option("conpression", "gzip") - .text(outputPath); + .option("compression", "gzip") + .json(outputPath); } diff --git a/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml index 78bac5eaf..855cac65e 100644 --- a/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml @@ -62,7 +62,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePath${sourcePath} + --sourcePath${sourcePath}/relation --outputPath${workingDir}/mergesRelation --hive_metastore_uris${hive_metastore_uris} @@ -86,7 +86,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePath${sourcePath} + --sourcePath${sourcePath}/relation --outputPath${workingDir}/relation --hdfsPath${workingDir}/blacklist --mergesPath${workingDir}/mergesRelation