1
0
Fork 0

[Person] remove the isolated nodes from the person set

This commit is contained in:
Miriam Baglioni 2024-10-25 10:05:17 +02:00
parent 32f444984e
commit 1fce7d5a0f
1 changed files with 25 additions and 3 deletions

View File

@ -16,10 +16,8 @@ import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -84,9 +82,33 @@ public class SparkExtractPersonRelations {
spark,
sourcePath,
workingPath);
removeIsolatedPerson(spark,sourcePath, workingPath);
});
}
private static void removeIsolatedPerson(SparkSession spark, String sourcePath, String workingPath) {
Dataset<Person> personDataset = spark.read().schema(Encoders.bean(Person.class).schema())
.json(sourcePath + "person")
.as(Encoders.bean(Person.class));
Dataset<Relation> relationDataset = spark.read().schema(Encoders.bean(Relation.class).schema())
.json(sourcePath + "relation")
.as(Encoders.bean(Relation.class));
personDataset.join(relationDataset, personDataset.col("id").equalTo(relationDataset.col("source")), "left_semi")
.write()
.option("compression","gzip")
.mode(SaveMode.Overwrite)
.json(workingPath + "person");
spark.read().schema(Encoders.bean(Person.class).schema())
.json(workingPath + "person")
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.json(sourcePath + "person");
}
private static void extractRelations(SparkSession spark, String sourcePath, String workingPath) {
Dataset<Tuple2<String, Relation>> relationDataset = spark