From 757bae53ea4cfdd19a6e3c89e4d3fa169fe2e914 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 11 May 2020 14:28:37 +0200 Subject: [PATCH] removed unusefule serialization points --- .../SparkRemoveBlacklistedRelationJob.java | 26 ++++--------------- 1 file changed, 5 insertions(+), 21 deletions(-) diff --git a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java index 6abfc70bd..c5104058c 100644 --- a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java +++ b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java @@ -80,8 +80,6 @@ public class SparkRemoveBlacklistedRelationJob { log.info("InputRelationCount: {}", inputRelation.count()); - log.info("NumberOfBlacklistedRelations: {}", blackListed.count()); - Dataset dedupSource = blackListed .joinWith( mergesRelation, blackListed.col("source").equalTo(mergesRelation.col("target")), @@ -109,9 +107,8 @@ public class SparkRemoveBlacklistedRelationJob { .mode(SaveMode.Overwrite) .json(blacklistPath + "/deduped"); - log.info("number of dedupedBL: {}", dedupBL.count()); - Dataset> tmp = inputRelation + inputRelation .joinWith( dedupBL, (inputRelation .col("source") @@ -120,26 +117,19 @@ public class SparkRemoveBlacklistedRelationJob { inputRelation .col("target") .equalTo(dedupBL.col("target")))), - "left_outer"); - - log.info("numberOfRelationAfterJoin: {}", tmp.count()); - - Dataset tmp1 = tmp.map(c -> { + "left_outer") + .map(c -> { Relation ir = c._1(); Optional obl = Optional.ofNullable(c._2()); if (obl.isPresent()) { - if (areEquals(ir, obl.get())) { + if (ir.equals(obl.get())) { return null; } } return ir; }, Encoders.bean(Relation.class)) - .filter(Objects::nonNull); - - log.info("NumberOfRelationAfterBlacklisting: {} ", tmp1.count()); - - tmp1 + .filter(Objects::nonNull) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") @@ -147,12 +137,6 @@ public class SparkRemoveBlacklistedRelationJob { } - private static boolean areEquals(Relation ir, Relation bl) { - return ir.getRelClass().equals(bl.getRelClass()) && - ir.getRelType().equals(bl.getRelType()) && - ir.getSubRelType().equals(bl.getSubRelType()) && - ir.getRelClass().equals(bl.getRelClass()); - } public static org.apache.spark.sql.Dataset readRelations( SparkSession spark, String inputPath) {