From 187b91a6991412f6f50866e121ed7b7fa1067116 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 14 Mar 2024 13:02:47 +0100 Subject: [PATCH] [SKG-IF] fixing issue in deserialization --- .../SelectConnectedEntities.java | 30 ++++++++++++++----- .../graph/dump/skgif/oozie_app/workflow.xml | 2 +- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectConnectedEntities.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectConnectedEntities.java index 4d2b0ec..83c2fa0 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectConnectedEntities.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectConnectedEntities.java @@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; import java.util.Arrays; import java.util.Optional; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -19,6 +20,9 @@ import org.apache.spark.sql.Dataset; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.skgif.EmitFromEntities; import eu.dnetlib.dhp.oa.graph.dump.skgif.Utils; @@ -71,9 +75,9 @@ public class SelectConnectedEntities implements Serializable { private static void selectConnectedEntities(SparkSession spark, String inputPath, String filterPath, - String workingDir) { + String workingDir) throws JsonProcessingException { - Dataset resultIds = spark.emptyDataset(Encoders.bean(Identifiers.class)); + Dataset resultIds = spark.emptyDataset(Encoders.STRING()); for (EntityType entity : ModelSupport.entityTypes.keySet()) if (ModelSupport.isResult(entity)) resultIds = resultIds @@ -81,8 +85,11 @@ public class SelectConnectedEntities implements Serializable { spark .read() .parquet(filterPath + entity.name() + "_ids") - .as(Encoders.bean(Identifiers.class))); + .select("id") + .as(Encoders.STRING())); + // log.info("Number of identifiers in the result {}", resultIds.count()); + log.info("Deserialization of the first id {}", new ObjectMapper().writeValueAsString(resultIds.first())); Dataset relation = Utils .readPath(spark, inputPath + "relation", Relation.class) .filter((FilterFunction) r -> !r.getDataInfo().getDeletedbyinference()); @@ -104,17 +111,25 @@ public class SelectConnectedEntities implements Serializable { // select relations having source in the set of identifiers selected for eosc Dataset resultSource = resultIds - .joinWith(relation, resultIds.col("id").equalTo(relation.col("source"))) - .map((MapFunction, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class)); + .joinWith(relation, resultIds.col("_1").equalTo(relation.col("source"))) + .map((MapFunction, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class)); // write relations having source and target in the set resultIds - .joinWith(resultSource, resultIds.col("id").equalTo(resultSource.col("target"))) - .map((MapFunction, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class)) + .joinWith(resultSource, resultIds.col("_1").equalTo(resultSource.col("target"))) + .map((MapFunction, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class)) .write() .option("compression", "gzip") .mode(SaveMode.Overwrite) .json(workingDir + "relation"); + log + .info( + "Number of relations in the result {}", resultIds + .joinWith(resultSource, resultIds.col("id").equalTo(resultSource.col("target"))) + .map( + (MapFunction, Relation>) t2 -> t2._2(), + Encoders.bean(Relation.class)) + .count()); // write relations between results and organizations resultSource @@ -125,7 +140,6 @@ public class SelectConnectedEntities implements Serializable { .option("compression", "gzip") .json(workingDir + "relation"); - // write relations between results and projects resultSource .joinWith(projects, resultSource.col("target").equalTo(projects.col("id"))) .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)) diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml index 04ace0a..1e45dea 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/skgif/oozie_app/workflow.xml @@ -212,7 +212,7 @@ --sourcePath${sourcePath} - --workingDir${nameNode}/user/miriam.baglioni/oa/graph/dump/temp/graph/ + --workingDir/user/miriam.baglioni/oa/graph/dump/temp/graph/ --filterPath${filterPath}