diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java index 232459c19..e2d8d7cb1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/funderresults/SparkDumpFunderResults.java @@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; import java.util.*; +import eu.dnetlib.dhp.oa.graph.dump.Constants; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -72,7 +73,7 @@ public class SparkDumpFunderResults implements Serializable { Dataset relation = Utils .readPath(spark, relationPath + "/relation", Relation.class) - .filter("dataInfo.deletedbyinference = false and relClass = 'isProducedBy'"); + .filter("dataInfo.deletedbyinference = false and lower(relClass) = '" + Constants.RESULT_PROJECT_IS_PRODUCED_BY.toLowerCase()+ "'"); Dataset result = Utils .readPath(spark, inputPath + "/publication", CommunityResult.class) @@ -86,13 +87,22 @@ public class SparkDumpFunderResults implements Serializable { .distinct() .collectAsList(); -// Dataset results = result -// .joinWith(relation, result.col("id").equalTo(relation.col("target")), "inner") -// .map((MapFunction, CommunityResult>) value -> { -// return value._1(); -// }, Encoders.bean(CommunityResult.class)); - funderList.forEach(funder -> writeFunderResult(funder, result, outputPath)); + funderList.forEach(funder -> { + String fundernsp = funder.substring(3); + String funderdump; + if (fundernsp.startsWith("corda")){ + funderdump = "EC_"; + if(fundernsp.endsWith("h2020")){ + funderdump += "H2020"; + }else{ + funderdump += "FP7"; + } + }else{ + funderdump = fundernsp.substring(0, fundernsp.indexOf("_")).toUpperCase(); + } + writeFunderResult(funder, result, outputPath + "/" + funderdump); + }); } @@ -113,7 +123,7 @@ public class SparkDumpFunderResults implements Serializable { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(outputPath + "/" + funder); + .json(outputPath); } }