From b176bbef1d23cf4a68d76d7d991020a526cd1711 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 14 Mar 2024 10:11:45 +0100 Subject: [PATCH] [SKG-IF] fixing issue in deserialization --- .../SelectConnectedEntities.java | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectConnectedEntities.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectConnectedEntities.java index b191e7e..4d2b0ec 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectConnectedEntities.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectConnectedEntities.java @@ -4,8 +4,10 @@ package eu.dnetlib.dhp.oa.graph.dump.filterentities; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; +import java.util.Arrays; import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -105,7 +107,7 @@ public class SelectConnectedEntities implements Serializable { .joinWith(relation, resultIds.col("id").equalTo(relation.col("source"))) .map((MapFunction, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class)); - // write relations having sorce and target in the set + // write relations having source and target in the set resultIds .joinWith(resultSource, resultIds.col("id").equalTo(resultSource.col("target"))) .map((MapFunction, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class)) @@ -153,11 +155,11 @@ public class SelectConnectedEntities implements Serializable { .json(workingDir + "project"); // read the results and select all the distinct instance.hostedbykey - Dataset hostedbyIds = spark.emptyDataset(Encoders.STRING()); + Dataset datasourceReferencedIds = spark.emptyDataset(Encoders.STRING()); for (EntityType entity : ModelSupport.entityTypes.keySet()) if (ModelSupport.isResult(entity)) { Class resultClazz = ModelSupport.entityTypes.get(entity); - hostedbyIds = hostedbyIds + datasourceReferencedIds = datasourceReferencedIds .union( Utils .readPath(spark, workingDir + entity.name(), resultClazz) @@ -165,14 +167,15 @@ public class SelectConnectedEntities implements Serializable { (FlatMapFunction) r -> r .getInstance() .stream() - .map(i -> i.getHostedby().getKey()) + .flatMap(i -> Stream.of(i.getHostedby().getKey(), i.getCollectedfrom().getKey())) .collect(Collectors.toList()) .iterator(), Encoders.STRING())); } + datasourceReferencedIds = datasourceReferencedIds.distinct(); // join with the datasources and write the datasource in the join - hostedbyIds - .joinWith(datasources, hostedbyIds.col("value").equalTo(datasources.col("id"))) + datasourceReferencedIds + .joinWith(datasources, datasourceReferencedIds.col("value").equalTo(datasources.col("id"))) .map((MapFunction, Datasource>) t2 -> t2._2(), Encoders.bean(Datasource.class)) .write() .mode(SaveMode.Overwrite) @@ -196,20 +199,15 @@ public class SelectConnectedEntities implements Serializable { // selecting relations between datasources and organizations in the selected set Dataset datasourceSbs = Utils.readPath(spark, workingDir + "datasource", Datasource.class); Dataset dsSourceRels = datasourceSbs - .joinWith(relation, datasourceSbs.col("id").equalTo(relation.col("source"))) + .joinWith(relation, datasourceSbs.col("id").as("dsId").equalTo(relation.col("source"))) .map((MapFunction, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class)); dsSourceRels - .joinWith(organizationSbs, dsSourceRels.col("target").equalTo(organizations.col("id"))) + .joinWith(organizationSbs, dsSourceRels.col("target").equalTo(organizations.col("id").as("orgId"))) .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)) .write() .mode(SaveMode.Append) .option("compression", "gzip") .json(workingDir + "relation"); - /** - * DATASOURCE_PROVIDED_BY_ORGANIZATION( - * "isProvidedBy"), - */ - } }