[SKG-IF] fixing issue in deserialization

This commit is contained in:
Miriam Baglioni 2024-03-14 15:27:43 +01:00
parent 187b91a699
commit 3126907d09
1 changed files with 2 additions and 12 deletions

View File

@ -88,8 +88,6 @@ public class SelectConnectedEntities implements Serializable {
.select("id")
.as(Encoders.STRING()));
// log.info("Number of identifiers in the result {}", resultIds.count());
log.info("Deserialization of the first id {}", new ObjectMapper().writeValueAsString(resultIds.first()));
Dataset<Relation> relation = Utils
.readPath(spark, inputPath + "relation", Relation.class)
.filter((FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference());
@ -111,25 +109,17 @@ public class SelectConnectedEntities implements Serializable {
// select relations having source in the set of identifiers selected for eosc
Dataset<Relation> resultSource = resultIds
.joinWith(relation, resultIds.col("_1").equalTo(relation.col("source")))
.joinWith(relation, resultIds.col("value").equalTo(relation.col("source")))
.map((MapFunction<Tuple2<String, Relation>, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class));
// write relations having source and target in the set
resultIds
.joinWith(resultSource, resultIds.col("_1").equalTo(resultSource.col("target")))
.joinWith(resultSource, resultIds.col("value").equalTo(resultSource.col("target")))
.map((MapFunction<Tuple2<String, Relation>, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(workingDir + "relation");
log
.info(
"Number of relations in the result {}", resultIds
.joinWith(resultSource, resultIds.col("id").equalTo(resultSource.col("target")))
.map(
(MapFunction<Tuple2<String, Relation>, Relation>) t2 -> t2._2(),
Encoders.bean(Relation.class))
.count());
// write relations between results and organizations
resultSource