refactoring

This commit is contained in:
Miriam Baglioni 2023-08-04 19:32:16 +02:00
parent 5fb58362c5
commit e9aca6b702
1 changed files with 24 additions and 91 deletions

View File

@ -4,15 +4,18 @@ package eu.dnetlib.dhp.oa.graph.dump.complete;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -60,99 +63,29 @@ public class SparkSelectValidRelationsJob implements Serializable {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
selectValidRelation(spark, inputPath, outputPath);
selectValidRelation2(spark, inputPath, outputPath);
});
}
private static void selectValidRelation(SparkSession spark, String inputPath, String outputPath) {
Dataset<Relation> relation = Utils
.readPath(spark, inputPath + "/relation", Relation.class)
.filter((FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference());
Dataset<Publication> publication = Utils
.readPath(spark, inputPath + "/publication", Publication.class)
.filter(
(FilterFunction<Publication>) p -> !p.getDataInfo().getDeletedbyinference()
&& !p.getDataInfo().getInvisible());
Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> dataset = Utils
.readPath(spark, inputPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class)
.filter(
(FilterFunction<eu.dnetlib.dhp.schema.oaf.Dataset>) d -> !d.getDataInfo().getDeletedbyinference()
&& !d.getDataInfo().getInvisible());
;
Dataset<Software> software = Utils
.readPath(spark, inputPath + "/software", Software.class)
.filter(
(FilterFunction<Software>) s -> !s.getDataInfo().getDeletedbyinference()
&& !s.getDataInfo().getInvisible());
Dataset<OtherResearchProduct> other = Utils
.readPath(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class)
.filter(
(FilterFunction<OtherResearchProduct>) o -> !o.getDataInfo().getDeletedbyinference()
&& !o.getDataInfo().getInvisible());
Dataset<Organization> organization = Utils
.readPath(spark, inputPath + "/organization", Organization.class)
.filter(
(FilterFunction<Organization>) o -> !o.getDataInfo().getDeletedbyinference()
&& !o.getDataInfo().getInvisible());
Dataset<Project> project = Utils
.readPath(spark, inputPath + "/project", Project.class)
.filter(
(FilterFunction<Project>) p -> !p.getDataInfo().getDeletedbyinference()
&& !p.getDataInfo().getInvisible());
Dataset<Datasource> datasource = Utils
.readPath(spark, inputPath + "/datasource", Datasource.class)
.filter(
(FilterFunction<Datasource>) d -> !d.getDataInfo().getDeletedbyinference()
&& !d.getDataInfo().getInvisible());
private static void selectValidRelation2(SparkSession spark, String inputPath, String outputPath){
final StructType structureSchema = new StructType().fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>");
relation.createOrReplaceTempView("relation");
publication.createOrReplaceTempView("publication");
dataset.createOrReplaceTempView("dataset");
other.createOrReplaceTempView("other");
software.createOrReplaceTempView("software");
organization.createOrReplaceTempView("organization");
project.createOrReplaceTempView("project");
datasource.createOrReplaceTempView("datasource");
org.apache.spark.sql.Dataset<Row> df =spark.createDataFrame(new ArrayList<Row>(), structureSchema);
List<String> entities = Arrays.asList("publication", "dataset","otherresearchproduct","software","organization","project","datasource");
for(String e : entities)
df = df.union(spark.read().schema(structureSchema).json(inputPath + "/" + e).filter("dataInfo.deletedbyinference != true"));
spark
.sql(
"SELECT id " +
"FROM publication " +
"UNION ALL " +
"SELECT id " +
"FROM dataset " +
"UNION ALL " +
"SELECT id " +
"FROM other " +
"UNION ALL " +
"SELECT id " +
"FROM software " +
"UNION ALL " +
"SELECT id " +
"FROM organization " +
"UNION ALL " +
"SELECT id " +
"FROM project " +
"UNION ALL " +
"SELECT id " +
"FROM datasource ")
.createOrReplaceTempView("identifiers");
spark
.sql(
"SELECT relation.* " +
"FROM relation " +
"JOIN identifiers i1 " +
"ON source = i1.id " +
"JOIN identifiers i2 " +
"ON target = i2.id ")
.as(Encoders.bean(Relation.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
org.apache.spark.sql.Dataset<Row> relations = spark.read().schema(Encoders.bean(Relation.class).schema()).json(inputPath + "/relation")
.filter("dataInfo.deletedbyinference == false");
relations.join(df, relations.col("source").equalTo(df.col("id")), "leftsemi")
.join(df, relations.col("target").equalTo(df.col("id")), "leftsemi")
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.json(outputPath);
}
}