From b9b6bdb2e67f99d14abeaca83783d3450b015fb4 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 24 Nov 2020 14:44:53 +0100 Subject: [PATCH] fixing issue on previous implementation --- .../funderresults/SparkDumpFunderResults.java | 55 +++++++++++++------ .../SparkResultLinkedToProject.java | 19 +++++-- 2 files changed, 53 insertions(+), 21 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java index acafa968a9..740a4245de 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java @@ -9,10 +9,7 @@ import java.util.*; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,6 +19,7 @@ import eu.dnetlib.dhp.oa.graph.dump.ResultMapper; import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult; +import eu.dnetlib.dhp.schema.dump.oaf.community.Project; import eu.dnetlib.dhp.schema.oaf.Relation; import scala.Tuple2; @@ -69,11 +67,12 @@ public class SparkDumpFunderResults implements Serializable { }); } - private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath, String relationPath) { + private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath, + String relationPath) { Dataset relation = Utils .readPath(spark, relationPath + "/relation", Relation.class) - .filter("dataInfo.deletedbyinference = false and relClass = 'produces'"); + .filter("dataInfo.deletedbyinference = false and relClass = 'isProducedBy'"); Dataset result = Utils .readPath(spark, inputPath + "/publication", CommunityResult.class) @@ -81,18 +80,40 @@ public class SparkDumpFunderResults implements Serializable { .union(Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class)) .union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class)); - result - .joinWith(relation, result.col("id").equalTo(relation.col("target")), "inner") - .map((MapFunction, FunderResults>) value -> { - FunderResults res = (FunderResults) value._1(); - res.setFunder_id(value._2().getSource().substring(3, 15)); - return res; - }, Encoders.bean(FunderResults.class)) - .write() - .partitionBy("funder_id") - .mode(SaveMode.Overwrite) - .json(outputPath); + List funderList = relation + .select("target") + .map((MapFunction) value -> value.getString(0).substring(0, 15), Encoders.STRING()) + .distinct() + .collectAsList(); + +// Dataset results = result +// .joinWith(relation, result.col("id").equalTo(relation.col("target")), "inner") +// .map((MapFunction, CommunityResult>) value -> { +// return value._1(); +// }, Encoders.bean(CommunityResult.class)); + + funderList.forEach(funder -> writeFunderResult(funder, result, outputPath)); } + private static void writeFunderResult(String funder, Dataset results, String outputPath) { + + results.map((MapFunction) r -> { + if (!Optional.ofNullable(r.getProjects()).isPresent()) { + return null; + } + for (Project p : r.getProjects()) { + if (p.getId().startsWith(funder)) { + return r; + } + } + return null; + }, Encoders.bean(CommunityResult.class)) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/" + funder); + } + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java index 6d059567c5..e5d7e2254f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkResultLinkedToProject.java @@ -10,6 +10,7 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -70,15 +71,25 @@ public class SparkResultLinkedToProject implements Serializable { private static void writeResultsLikedToProjects(SparkSession spark, Class inputClazz, String inputPath, String outputPath, String relationPath) { - Dataset results = Utils.readPath(spark, inputPath, inputClazz); + Dataset results = Utils + .readPath(spark, inputPath, inputClazz) + .filter("dataInfo.deletedbyinference = false and datainfo.invisible = false"); Dataset relations = Utils .readPath(spark, relationPath, Relation.class) - .filter("dataInfo.deletedbyinference = false and relClass = 'produces'"); + .filter("dataInfo.deletedbyinference = false and lower(relClass) = 'isproducedby'"); + relations .joinWith( - results, relations.col("target").equalTo(results.col("id")), + results, relations.col("source").equalTo(results.col("id")), "inner") - .map((MapFunction, R>) t2 -> t2._2(), Encoders.bean(inputClazz)) + .groupByKey( + (MapFunction, String>) value -> value + ._2() + .getId(), + Encoders.STRING()) + .mapGroups((MapGroupsFunction, R>) (k, it) -> { + return it.next()._2(); + }, Encoders.bean(inputClazz)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip")