Add a "CleanRelation" action after the PropagateRelation to filter out all relations that have been deleted by inference or that are pointing to dangling entities #328
|
@ -46,22 +46,26 @@ class SparkCleanRelation(parser: ArgumentApplicationParser, spark: SparkSession)
|
|||
val entities =
|
||||
Seq("datasource", "project", "organization", "publication", "dataset", "software", "otherresearchproduct")
|
||||
|
||||
val idsSchema = StructType.fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>")
|
||||
|
||||
val emptyIds = spark.createDataFrame(spark.sparkContext.emptyRDD[Row].setName("empty"),
|
||||
new StructType().add(StructField("id", DataTypes.StringType, true)))
|
||||
idsSchema)
|
||||
|
||||
val ids = entities
|
||||
.foldLeft(emptyIds)((ds, entity) => {
|
||||
val entityPath = graphBasePath + '/' + entity
|
||||
if (HdfsSupport.exists(entityPath, spark.sparkContext.hadoopConfiguration)) {
|
||||
ds.union(spark.read.schema("`id` STRING").json(entityPath))
|
||||
ds.union(spark.read.schema(idsSchema).json(entityPath))
|
||||
} else {
|
||||
ds
|
||||
}
|
||||
})
|
||||
.filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true")
|
||||
.select("id")
|
||||
.distinct()
|
||||
|
||||
val relations = spark.read.schema(Encoders.bean(classOf[Relation]).schema).json(inputPath)
|
||||
.filter(col("dataInfo.deletedbyinference").isNull || col("dataInfo.deletedbyinference") === false)
|
||||
.filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true")
|
||||
|
||||
AbstractSparkAction.save(
|
||||
relations
|
||||
|
|
Loading…
Reference in New Issue