From 848aabbb6cef47e941f739f475521011dfee6be9 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Sun, 25 Jul 2021 12:06:41 +0200 Subject: [PATCH] minor fix --- .../main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala | 1 + .../java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala index ba483bfb2..0a7fc18fb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala @@ -42,6 +42,7 @@ object SparkCreateScholix { val relationDS: Dataset[(String, Relation)] = spark.read.load(relationPath).as[Relation] + .filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge")) .map(r => (r.getSource, r))(Encoders.tuple(Encoders.STRING, relEncoder)) val summaryDS: Dataset[(String, ScholixSummary)] = spark.read.load(summaryPath).as[ScholixSummary] diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala index ac189b6ba..0970375f5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala @@ -34,7 +34,7 @@ object SparkCreateSummaryObject { implicit val summaryEncoder:Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary] - val ds:Dataset[Result] = spark.read.load(s"$sourcePath/*").as[Oaf].filter(r => r.isInstanceOf[Result]).map(r => r.asInstanceOf[Result]) + val ds:Dataset[Result] = spark.read.load(s"$sourcePath/*").as[Result].filter(r=>r.getDataInfo== null || r.getDataInfo.getDeletedbyinference== false) ds.repartition(6000).map(r => ScholixUtils.resultToSummary(r)).filter(s => s!= null).write.mode(SaveMode.Overwrite).save(targetPath)