From 1fce7d5a0f467b478993d408c1103dbd7d895acf Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Fri, 25 Oct 2024 10:05:17 +0200 Subject: [PATCH] [Person] remove the isolated nodes from the person set --- .../person/SparkExtractPersonRelations.java | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/person/SparkExtractPersonRelations.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/person/SparkExtractPersonRelations.java index 6caeef478b..3892498dfd 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/person/SparkExtractPersonRelations.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/person/SparkExtractPersonRelations.java @@ -16,10 +16,8 @@ import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,9 +82,33 @@ public class SparkExtractPersonRelations { spark, sourcePath, workingPath); + removeIsolatedPerson(spark,sourcePath, workingPath); }); } + private static void removeIsolatedPerson(SparkSession spark, String sourcePath, String workingPath) { + Dataset personDataset = spark.read().schema(Encoders.bean(Person.class).schema()) + .json(sourcePath + "person") + .as(Encoders.bean(Person.class)); + + Dataset relationDataset = spark.read().schema(Encoders.bean(Relation.class).schema()) + .json(sourcePath + "relation") + .as(Encoders.bean(Relation.class)); + + personDataset.join(relationDataset, personDataset.col("id").equalTo(relationDataset.col("source")), "left_semi") + .write() + .option("compression","gzip") + .mode(SaveMode.Overwrite) + .json(workingPath + "person"); + + spark.read().schema(Encoders.bean(Person.class).schema()) + .json(workingPath + "person") + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(sourcePath + "person"); + } + private static void extractRelations(SparkSession spark, String sourcePath, String workingPath) { Dataset> relationDataset = spark