diff --git a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java index d252722637..fe60027102 100644 --- a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java +++ b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java @@ -62,8 +62,8 @@ public class SparkRemoveBlacklistedRelationJob { spark -> { removeBlacklistedRelations( spark, - inputPath, blacklistPath, + inputPath, outputPath, mergesPath); }); @@ -76,30 +76,34 @@ public class SparkRemoveBlacklistedRelationJob { Dataset inputRelation = readRelations(spark, inputPath); Dataset mergesRelation = readRelations(spark, mergesPath); + log.info("InputRelationCount: {}", inputRelation.count()); + Dataset dedupSource = blackListed .joinWith(mergesRelation, blackListed.col("source").equalTo(mergesRelation.col("target")), "left_outer") .map(c -> { - Optional merged = Optional.ofNullable(c._2()); - Relation bl = c._1(); - if (merged.isPresent()) { - bl.setSource(merged.get().getSource()); - } - return bl; + Optional + .ofNullable(c._2()) + .ifPresent(mr -> c._1().setSource(mr.getSource())); + return c._1(); }, Encoders.bean(Relation.class)); Dataset dedupBL = dedupSource .joinWith(mergesRelation, dedupSource.col("target").equalTo(mergesRelation.col("target")), "left_outer") .map(c -> { - Optional merged = Optional.ofNullable(c._2()); - Relation bl = c._1(); - if (merged.isPresent()) { - bl.setTarget(merged.get().getSource()); - } - return bl; + Optional + .ofNullable(c._2()) + .ifPresent(mr -> c._1().setTarget(mr.getSource())); + return c._1(); }, Encoders.bean(Relation.class)); - inputRelation - .joinWith(dedupBL, inputRelation.col("source").equalTo(dedupBL.col("source")), "left_outer") + dedupBL + .write() + .json(blacklistPath + "/deduped"); + + Dataset tmp = inputRelation + .joinWith( + dedupBL, inputRelation.col("source").equalTo(dedupBL.col("source")), + "left_outer") .map(c -> { Relation ir = c._1(); Optional obl = Optional.ofNullable(c._2()); @@ -111,12 +115,15 @@ public class SparkRemoveBlacklistedRelationJob { return ir; }, Encoders.bean(Relation.class)) - .filter(r -> !(r == null)) - .toJSON() + .filter(r -> r != null); + + log.info("NumberOfRelationAfterBlacklisting: {} ", tmp.count()); + + tmp .write() .mode(SaveMode.Overwrite) - .option("conpression", "gzip") - .text(outputPath); + .option("compression", "gzip") + .json(outputPath); } diff --git a/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml index 78bac5eafe..855cac65eb 100644 --- a/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml @@ -62,7 +62,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePath${sourcePath} + --sourcePath${sourcePath}/relation --outputPath${workingDir}/mergesRelation --hive_metastore_uris${hive_metastore_uris} @@ -86,7 +86,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePath${sourcePath} + --sourcePath${sourcePath}/relation --outputPath${workingDir}/relation --hdfsPath${workingDir}/blacklist --mergesPath${workingDir}/mergesRelation