From 7e66bc25271c6d8b3b65833654b2e7f6ac68570a Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 11 May 2020 09:40:58 +0200 Subject: [PATCH] fix a typo in the compression keyword and added some logging info in the spark job --- .../blacklist/PrepareMergedRelationJob.java | 11 ++-- .../SparkRemoveBlacklistedRelationJob.java | 61 +++++++++++++------ 2 files changed, 50 insertions(+), 22 deletions(-) diff --git a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/PrepareMergedRelationJob.java b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/PrepareMergedRelationJob.java index 7ac0a34132..d5c2b518ad 100644 --- a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/PrepareMergedRelationJob.java +++ b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/PrepareMergedRelationJob.java @@ -67,11 +67,12 @@ public class PrepareMergedRelationJob { Dataset relation = readRelations(spark, inputPath); - relation.filter("relclass = 'merges' and datainfo.deletedbyinference=false") - .write() - .mode(SaveMode.Overwrite) - .option("compression","gizp") - .json(outputPath); + relation + .filter("relclass = 'merges' and datainfo.deletedbyinference=false") + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); // relation.createOrReplaceTempView("relation"); // // spark diff --git a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java index 5bf9f5a3f4..1a47a07249 100644 --- a/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java +++ b/dhp-workflows/dhp-blacklist/src/main/java/eu/dnetlib/dhp/blacklist/SparkRemoveBlacklistedRelationJob.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.blacklist; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.util.Objects; import java.util.Optional; import org.apache.commons.io.IOUtils; @@ -18,6 +19,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Relation; +import scala.Tuple2; public class SparkRemoveBlacklistedRelationJob { private static final Logger log = LoggerFactory.getLogger(SparkRemoveBlacklistedRelationJob.class); @@ -78,8 +80,12 @@ public class SparkRemoveBlacklistedRelationJob { log.info("InputRelationCount: {}", inputRelation.count()); + log.info("NumberOfBlacklistedRelations: {}", blackListed.count()); + Dataset dedupSource = blackListed - .joinWith(mergesRelation, blackListed.col("source").equalTo(mergesRelation.col("target")), "left_outer") + .joinWith( + mergesRelation, blackListed.col("source").equalTo(mergesRelation.col("target")), + "left_outer") .map(c -> { Optional .ofNullable(c._2()) @@ -88,7 +94,9 @@ public class SparkRemoveBlacklistedRelationJob { }, Encoders.bean(Relation.class)); Dataset dedupBL = dedupSource - .joinWith(mergesRelation, dedupSource.col("target").equalTo(mergesRelation.col("target")), "left_outer") + .joinWith( + mergesRelation, dedupSource.col("target").equalTo(mergesRelation.col("target")), + "left_outer") .map(c -> { Optional .ofNullable(c._2()) @@ -98,28 +106,41 @@ public class SparkRemoveBlacklistedRelationJob { dedupBL .write() + .mode(SaveMode.Overwrite) .json(blacklistPath + "/deduped"); - Dataset tmp = inputRelation + log.info("number of dedupedBL: {}", dedupBL.count()); + + Dataset> tmp = inputRelation .joinWith( - dedupBL, inputRelation.col("source").equalTo(dedupBL.col("source")), - "left_outer") - .map(c -> { - Relation ir = c._1(); - Optional obl = Optional.ofNullable(c._2()); - if (obl.isPresent()) { - if (ir.equals(obl.get())) { - return null; - } + dedupBL, (inputRelation + .col("source") + .equalTo(dedupBL.col("source")) + .and( + inputRelation + .col("target") + .equalTo(dedupBL.col("target")) + .and(inputRelation.col("relclass").equalTo(dedupBL.col("relclass"))))), + "left_outer"); + + log.info("numberOfRelationAfterJoin: {}", tmp.count()); + + Dataset tmp1 = tmp.map(c -> { + Relation ir = c._1(); + Optional obl = Optional.ofNullable(c._2()); + if (obl.isPresent()) { + if (areEquals(ir, obl.get())) { + return null; } - return ir; + } + return ir; - }, Encoders.bean(Relation.class)) - .filter(r -> r != null); + }, Encoders.bean(Relation.class)) + .filter(Objects::nonNull); - log.info("NumberOfRelationAfterBlacklisting: {} ", tmp.count()); + log.info("NumberOfRelationAfterBlacklisting: {} ", tmp1.count()); - tmp + tmp1 .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") @@ -127,6 +148,12 @@ public class SparkRemoveBlacklistedRelationJob { } + private static boolean areEquals(Relation ir, Relation bl) { + return ir.getRelClass().equals(bl.getRelClass()) && + ir.getRelType().equals(bl.getRelType()) && + ir.getSubRelType().equals(bl.getSubRelType()); + } + public static org.apache.spark.sql.Dataset readRelations( SparkSession spark, String inputPath) { return spark