refactoring and addition of the funder nsp first part as nome for the dump insteasd of the whole nsp

This commit is contained in:
Miriam Baglioni 2020-11-25 13:45:41 +01:00
parent e7e418e444
commit 87a9f616ae
1 changed files with 18 additions and 8 deletions

View File

@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable; import java.io.Serializable;
import java.util.*; import java.util.*;
import eu.dnetlib.dhp.oa.graph.dump.Constants;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
@ -72,7 +73,7 @@ public class SparkDumpFunderResults implements Serializable {
Dataset<Relation> relation = Utils Dataset<Relation> relation = Utils
.readPath(spark, relationPath + "/relation", Relation.class) .readPath(spark, relationPath + "/relation", Relation.class)
.filter("dataInfo.deletedbyinference = false and relClass = 'isProducedBy'"); .filter("dataInfo.deletedbyinference = false and lower(relClass) = '" + Constants.RESULT_PROJECT_IS_PRODUCED_BY.toLowerCase()+ "'");
Dataset<CommunityResult> result = Utils Dataset<CommunityResult> result = Utils
.readPath(spark, inputPath + "/publication", CommunityResult.class) .readPath(spark, inputPath + "/publication", CommunityResult.class)
@ -86,13 +87,22 @@ public class SparkDumpFunderResults implements Serializable {
.distinct() .distinct()
.collectAsList(); .collectAsList();
// Dataset<CommunityResult> results = result
// .joinWith(relation, result.col("id").equalTo(relation.col("target")), "inner")
// .map((MapFunction<Tuple2<CommunityResult, Relation>, CommunityResult>) value -> {
// return value._1();
// }, Encoders.bean(CommunityResult.class));
funderList.forEach(funder -> writeFunderResult(funder, result, outputPath)); funderList.forEach(funder -> {
String fundernsp = funder.substring(3);
String funderdump;
if (fundernsp.startsWith("corda")){
funderdump = "EC_";
if(fundernsp.endsWith("h2020")){
funderdump += "H2020";
}else{
funderdump += "FP7";
}
}else{
funderdump = fundernsp.substring(0, fundernsp.indexOf("_")).toUpperCase();
}
writeFunderResult(funder, result, outputPath + "/" + funderdump);
});
} }
@ -113,7 +123,7 @@ public class SparkDumpFunderResults implements Serializable {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath + "/" + funder); .json(outputPath);
} }
} }