This commit is contained in:
Miriam Baglioni 2020-11-18 17:43:08 +01:00
parent 5402062ff5
commit 906db690d2
1 changed files with 1 additions and 46 deletions

View File

@ -65,7 +65,7 @@ public class SparkPrepareResultProject implements Serializable {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
prepareResultProjectList2(spark, inputPath, outputPath, communityMapPath);
prepareResultProjectList(spark, inputPath, outputPath, communityMapPath);
});
}
@ -115,51 +115,6 @@ public class SparkPrepareResultProject implements Serializable {
.option("compression", "gzip")
.json(outputPath + "/" + funder);
});
}
private static void prepareResultProjectList2(SparkSession spark, String inputPath, String outputPath,
String communityMapPath) {
CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath);
Dataset<Relation> relation = Utils
.readPath(spark, inputPath + "/relation", Relation.class)
.filter("dataInfo.deletedbyinference = false and relClass = 'produces'");
Dataset<eu.dnetlib.dhp.schema.oaf.Result> result = Utils
.readPath(spark, inputPath + "/publication", eu.dnetlib.dhp.schema.oaf.Result.class)
.union(Utils.readPath(spark, inputPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Result.class))
.union(Utils.readPath(spark, inputPath + "/otherresearchproduct", eu.dnetlib.dhp.schema.oaf.Result.class))
.union(Utils.readPath(spark, inputPath + "/software", eu.dnetlib.dhp.schema.oaf.Result.class));
result
.joinWith(relation, result.col("id").equalTo(relation.col("target")))
.groupByKey(
(MapFunction<Tuple2<eu.dnetlib.dhp.schema.oaf.Result, Relation>, String>) value -> value
._2()
.getSource()
.substring(3, 15),
Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, Tuple2<eu.dnetlib.dhp.schema.oaf.Result, Relation>, String>) (s, it) -> {
Tuple2<eu.dnetlib.dhp.schema.oaf.Result, Relation> first = it.next();
List<Result> resultList = new ArrayList<>();
resultList.add(ResultMapper.map(first._1(), communityMap, true));
it.forEachRemaining(c -> {
resultList.add(ResultMapper.map(c._1(), communityMap, true));
});
spark
.createDataFrame(resultList, Result.class)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/" + s);
return new String();
}, Encoders.STRING());
}
}