|
|
|
@ -76,16 +76,16 @@ public class SparkPrepareResultProject implements Serializable {
|
|
|
|
|
private static void prepareResultProjectList(SparkSession spark, String inputPath, String outputPath) {
|
|
|
|
|
Dataset<Relation> relation = Utils
|
|
|
|
|
.readPath(spark, inputPath + "/relation", Relation.class)
|
|
|
|
|
.filter("dataInfo.deletedbyinference = false and relClass = 'produces'");
|
|
|
|
|
.filter("dataInfo.deletedbyinference = false and lower(relClass) = 'isproducedby'");
|
|
|
|
|
Dataset<eu.dnetlib.dhp.schema.oaf.Project> projects = Utils
|
|
|
|
|
.readPath(spark, inputPath + "/project", eu.dnetlib.dhp.schema.oaf.Project.class);
|
|
|
|
|
|
|
|
|
|
projects
|
|
|
|
|
.joinWith(relation, projects.col("id").equalTo(relation.col("source")))
|
|
|
|
|
.joinWith(relation, projects.col("id").equalTo(relation.col("target")), "inner")
|
|
|
|
|
.groupByKey(
|
|
|
|
|
(MapFunction<Tuple2<eu.dnetlib.dhp.schema.oaf.Project, Relation>, String>) value -> value
|
|
|
|
|
._2()
|
|
|
|
|
.getTarget(),
|
|
|
|
|
.getSource(),
|
|
|
|
|
Encoders.STRING())
|
|
|
|
|
.mapGroups(
|
|
|
|
|
(MapGroupsFunction<String, Tuple2<eu.dnetlib.dhp.schema.oaf.Project, Relation>, ResultProject>) (s,
|
|
|
|
@ -93,7 +93,7 @@ public class SparkPrepareResultProject implements Serializable {
|
|
|
|
|
Set<String> projectSet = new HashSet<>();
|
|
|
|
|
Tuple2<eu.dnetlib.dhp.schema.oaf.Project, Relation> first = it.next();
|
|
|
|
|
ResultProject rp = new ResultProject();
|
|
|
|
|
rp.setResultId(first._2().getTarget());
|
|
|
|
|
rp.setResultId(s);
|
|
|
|
|
eu.dnetlib.dhp.schema.oaf.Project p = first._1();
|
|
|
|
|
projectSet.add(p.getId());
|
|
|
|
|
Project ps = getProject(p);
|
|
|
|
|