From e9aca6b702006a3527fc053ea57d9cc9e4484df8 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 4 Aug 2023 19:32:16 +0200 Subject: [PATCH] refactoring --- .../SparkSelectValidRelationsJob.java | 115 ++++-------------- 1 file changed, 24 insertions(+), 91 deletions(-) diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkSelectValidRelationsJob.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkSelectValidRelationsJob.java index 73ebc21..d188685 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkSelectValidRelationsJob.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkSelectValidRelationsJob.java @@ -4,15 +4,18 @@ package eu.dnetlib.dhp.oa.graph.dump.complete; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Optional; + import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; + +import org.apache.spark.sql.*; + +import org.apache.spark.sql.types.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,99 +63,29 @@ public class SparkSelectValidRelationsJob implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath); - selectValidRelation(spark, inputPath, outputPath); + selectValidRelation2(spark, inputPath, outputPath); }); } - private static void selectValidRelation(SparkSession spark, String inputPath, String outputPath) { - Dataset relation = Utils - .readPath(spark, inputPath + "/relation", Relation.class) - .filter((FilterFunction) r -> !r.getDataInfo().getDeletedbyinference()); - Dataset publication = Utils - .readPath(spark, inputPath + "/publication", Publication.class) - .filter( - (FilterFunction) p -> !p.getDataInfo().getDeletedbyinference() - && !p.getDataInfo().getInvisible()); - Dataset dataset = Utils - .readPath(spark, inputPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class) - .filter( - (FilterFunction) d -> !d.getDataInfo().getDeletedbyinference() - && !d.getDataInfo().getInvisible()); - ; - Dataset software = Utils - .readPath(spark, inputPath + "/software", Software.class) - .filter( - (FilterFunction) s -> !s.getDataInfo().getDeletedbyinference() - && !s.getDataInfo().getInvisible()); - Dataset other = Utils - .readPath(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class) - .filter( - (FilterFunction) o -> !o.getDataInfo().getDeletedbyinference() - && !o.getDataInfo().getInvisible()); - Dataset organization = Utils - .readPath(spark, inputPath + "/organization", Organization.class) - .filter( - (FilterFunction) o -> !o.getDataInfo().getDeletedbyinference() - && !o.getDataInfo().getInvisible()); - Dataset project = Utils - .readPath(spark, inputPath + "/project", Project.class) - .filter( - (FilterFunction) p -> !p.getDataInfo().getDeletedbyinference() - && !p.getDataInfo().getInvisible()); - Dataset datasource = Utils - .readPath(spark, inputPath + "/datasource", Datasource.class) - .filter( - (FilterFunction) d -> !d.getDataInfo().getDeletedbyinference() - && !d.getDataInfo().getInvisible()); + private static void selectValidRelation2(SparkSession spark, String inputPath, String outputPath){ + final StructType structureSchema = new StructType().fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>"); - relation.createOrReplaceTempView("relation"); - publication.createOrReplaceTempView("publication"); - dataset.createOrReplaceTempView("dataset"); - other.createOrReplaceTempView("other"); - software.createOrReplaceTempView("software"); - organization.createOrReplaceTempView("organization"); - project.createOrReplaceTempView("project"); - datasource.createOrReplaceTempView("datasource"); + org.apache.spark.sql.Dataset df =spark.createDataFrame(new ArrayList(), structureSchema); + List entities = Arrays.asList("publication", "dataset","otherresearchproduct","software","organization","project","datasource"); + for(String e : entities) + df = df.union(spark.read().schema(structureSchema).json(inputPath + "/" + e).filter("dataInfo.deletedbyinference != true")); - spark - .sql( - "SELECT id " + - "FROM publication " + - "UNION ALL " + - "SELECT id " + - "FROM dataset " + - "UNION ALL " + - "SELECT id " + - "FROM other " + - "UNION ALL " + - "SELECT id " + - "FROM software " + - "UNION ALL " + - "SELECT id " + - "FROM organization " + - "UNION ALL " + - "SELECT id " + - "FROM project " + - "UNION ALL " + - "SELECT id " + - "FROM datasource ") - .createOrReplaceTempView("identifiers"); - - spark - .sql( - "SELECT relation.* " + - "FROM relation " + - "JOIN identifiers i1 " + - "ON source = i1.id " + - "JOIN identifiers i2 " + - "ON target = i2.id ") - .as(Encoders.bean(Relation.class)) - .write() - .option("compression", "gzip") - .mode(SaveMode.Overwrite) - .json(outputPath); + org.apache.spark.sql.Dataset relations = spark.read().schema(Encoders.bean(Relation.class).schema()).json(inputPath + "/relation") + .filter("dataInfo.deletedbyinference == false"); + relations.join(df, relations.col("source").equalTo(df.col("id")), "leftsemi") + .join(df, relations.col("target").equalTo(df.col("id")), "leftsemi") + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(outputPath); } + }