This commit is contained in:
Miriam Baglioni 2020-11-20 12:21:33 +01:00
parent d362f2637d
commit 0a9db67eec
3 changed files with 6 additions and 13 deletions

View File

@ -8,7 +8,10 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -18,6 +21,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
import scala.Tuple2;
public class SparkResultLinkedToProject implements Serializable { public class SparkResultLinkedToProject implements Serializable {
@ -74,6 +78,7 @@ public class SparkResultLinkedToProject implements Serializable {
.joinWith( .joinWith(
results, relations.col("target").equalTo(results.col("id")), results, relations.col("target").equalTo(results.col("id")),
"inner") "inner")
.map((MapFunction<Tuple2<Relation, R>, R>) t2 -> t2._2(), Encoders.bean(inputClazz))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")

View File

@ -17,12 +17,7 @@
"paramDescription": "true if the spark session is managed, false otherwise", "paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false "paramRequired": false
}, },
{ {
"paramName": "cmp",
"paramLongName": "communityMapPath",
"paramDescription": "the community map path",
"paramRequired": true
},{
"paramName": "rp", "paramName": "rp",
"paramLongName": "relationPath", "paramLongName": "relationPath",
"paramDescription": "the relationPath", "paramDescription": "the relationPath",

View File

@ -1,11 +1,4 @@
[ [
{
"paramName":"cmp",
"paramLongName":"communityMapPath",
"paramDescription": "the path to the serialization of the community map",
"paramRequired": true
},
{ {
"paramName":"s", "paramName":"s",
"paramLongName":"sourcePath", "paramLongName":"sourcePath",