fix a typo in the compression keyword and added some logging info in the spark job

This commit is contained in:
Miriam Baglioni 2020-05-11 09:40:58 +02:00
parent 28556507e7
commit 7e66bc2527
2 changed files with 50 additions and 22 deletions

View File

@ -67,10 +67,11 @@ public class PrepareMergedRelationJob {
Dataset<Relation> relation = readRelations(spark, inputPath); Dataset<Relation> relation = readRelations(spark, inputPath);
relation.filter("relclass = 'merges' and datainfo.deletedbyinference=false") relation
.filter("relclass = 'merges' and datainfo.deletedbyinference=false")
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression","gizp") .option("compression", "gzip")
.json(outputPath); .json(outputPath);
// relation.createOrReplaceTempView("relation"); // relation.createOrReplaceTempView("relation");
// //

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.blacklist;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -18,6 +19,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
public class SparkRemoveBlacklistedRelationJob { public class SparkRemoveBlacklistedRelationJob {
private static final Logger log = LoggerFactory.getLogger(SparkRemoveBlacklistedRelationJob.class); private static final Logger log = LoggerFactory.getLogger(SparkRemoveBlacklistedRelationJob.class);
@ -78,8 +80,12 @@ public class SparkRemoveBlacklistedRelationJob {
log.info("InputRelationCount: {}", inputRelation.count()); log.info("InputRelationCount: {}", inputRelation.count());
log.info("NumberOfBlacklistedRelations: {}", blackListed.count());
Dataset<Relation> dedupSource = blackListed Dataset<Relation> dedupSource = blackListed
.joinWith(mergesRelation, blackListed.col("source").equalTo(mergesRelation.col("target")), "left_outer") .joinWith(
mergesRelation, blackListed.col("source").equalTo(mergesRelation.col("target")),
"left_outer")
.map(c -> { .map(c -> {
Optional Optional
.ofNullable(c._2()) .ofNullable(c._2())
@ -88,7 +94,9 @@ public class SparkRemoveBlacklistedRelationJob {
}, Encoders.bean(Relation.class)); }, Encoders.bean(Relation.class));
Dataset<Relation> dedupBL = dedupSource Dataset<Relation> dedupBL = dedupSource
.joinWith(mergesRelation, dedupSource.col("target").equalTo(mergesRelation.col("target")), "left_outer") .joinWith(
mergesRelation, dedupSource.col("target").equalTo(mergesRelation.col("target")),
"left_outer")
.map(c -> { .map(c -> {
Optional Optional
.ofNullable(c._2()) .ofNullable(c._2())
@ -98,28 +106,41 @@ public class SparkRemoveBlacklistedRelationJob {
dedupBL dedupBL
.write() .write()
.mode(SaveMode.Overwrite)
.json(blacklistPath + "/deduped"); .json(blacklistPath + "/deduped");
Dataset<Relation> tmp = inputRelation log.info("number of dedupedBL: {}", dedupBL.count());
Dataset<Tuple2<Relation, Relation>> tmp = inputRelation
.joinWith( .joinWith(
dedupBL, inputRelation.col("source").equalTo(dedupBL.col("source")), dedupBL, (inputRelation
"left_outer") .col("source")
.map(c -> { .equalTo(dedupBL.col("source"))
.and(
inputRelation
.col("target")
.equalTo(dedupBL.col("target"))
.and(inputRelation.col("relclass").equalTo(dedupBL.col("relclass"))))),
"left_outer");
log.info("numberOfRelationAfterJoin: {}", tmp.count());
Dataset<Relation> tmp1 = tmp.map(c -> {
Relation ir = c._1(); Relation ir = c._1();
Optional<Relation> obl = Optional.ofNullable(c._2()); Optional<Relation> obl = Optional.ofNullable(c._2());
if (obl.isPresent()) { if (obl.isPresent()) {
if (ir.equals(obl.get())) { if (areEquals(ir, obl.get())) {
return null; return null;
} }
} }
return ir; return ir;
}, Encoders.bean(Relation.class)) }, Encoders.bean(Relation.class))
.filter(r -> r != null); .filter(Objects::nonNull);
log.info("NumberOfRelationAfterBlacklisting: {} ", tmp.count()); log.info("NumberOfRelationAfterBlacklisting: {} ", tmp1.count());
tmp tmp1
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
@ -127,6 +148,12 @@ public class SparkRemoveBlacklistedRelationJob {
} }
private static boolean areEquals(Relation ir, Relation bl) {
return ir.getRelClass().equals(bl.getRelClass()) &&
ir.getRelType().equals(bl.getRelType()) &&
ir.getSubRelType().equals(bl.getSubRelType());
}
public static org.apache.spark.sql.Dataset<Relation> readRelations( public static org.apache.spark.sql.Dataset<Relation> readRelations(
SparkSession spark, String inputPath) { SparkSession spark, String inputPath) {
return spark return spark