From 320cf02d967f86aa7f2971d5bb81aef714621d13 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 13 Jul 2021 18:13:32 +0200 Subject: [PATCH] Changed the way to find results linked to projects. We verify to actually have the project on the graph before selecting the result --- .../SparkResultLinkedToProject.java | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java index 1a28a21f43..9c4c73d851 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java @@ -24,7 +24,8 @@ import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Result; import scala.Tuple2; - +import org.apache.spark.sql.*; +import eu.dnetlib.dhp.schema.oaf.Project; /** * Selects the results linked to projects. Only for these results the dump will be performed. * The code to perform the dump and to expend the dumped results with the informaiton related to projects @@ -59,8 +60,8 @@ public class SparkResultLinkedToProject implements Serializable { final String resultClassName = parser.get("resultTableName"); log.info("resultTableName: {}", resultClassName); - final String relationPath = parser.get("relationPath"); - log.info("relationPath: {}", relationPath); + final String graphPath = parser.get("graphPath"); + log.info("graphPath: {}", graphPath); Class inputClazz = (Class) Class.forName(resultClassName); SparkConf conf = new SparkConf(); @@ -70,34 +71,32 @@ public class SparkResultLinkedToProject implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath); - writeResultsLinkedToProjects(spark, inputClazz, inputPath, outputPath, relationPath); + writeResultsLinkedToProjects(spark, inputClazz, inputPath, outputPath, graphPath); }); } private static void writeResultsLinkedToProjects(SparkSession spark, Class inputClazz, - String inputPath, String outputPath, String relationPath) { + String inputPath, String outputPath, String graphPath) { Dataset results = Utils .readPath(spark, inputPath, inputClazz) .filter("dataInfo.deletedbyinference = false and datainfo.invisible = false"); Dataset relations = Utils - .readPath(spark, relationPath, Relation.class) + .readPath(spark, graphPath + "/relation", Relation.class) .filter( "dataInfo.deletedbyinference = false and lower(relClass) = '" + ModelConstants.IS_PRODUCED_BY.toLowerCase() + "'"); - relations - .joinWith( - results, relations.col("source").equalTo(results.col("id")), - "inner") - .groupByKey( - (MapFunction, String>) value -> value - ._2() - .getId(), - Encoders.STRING()) - .mapGroups((MapGroupsFunction, R>) (k, it) -> { - return it.next()._2(); - }, Encoders.bean(inputClazz)) + spark + .sql( + "Select res.* " + + "from relation rel " + + "join result res " + + "on rel.source = res.id " + + "join project p " + + "on rel.target = p.id " + + "") + .as(Encoders.bean(inputClazz)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip")