diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCleanRelation.scala b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCleanRelation.scala index 5f1e63ca8..5d8da42c2 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCleanRelation.scala +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCleanRelation.scala @@ -46,22 +46,26 @@ class SparkCleanRelation(parser: ArgumentApplicationParser, spark: SparkSession) val entities = Seq("datasource", "project", "organization", "publication", "dataset", "software", "otherresearchproduct") + val idsSchema = StructType.fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>") + val emptyIds = spark.createDataFrame(spark.sparkContext.emptyRDD[Row].setName("empty"), - new StructType().add(StructField("id", DataTypes.StringType, true))) + idsSchema) val ids = entities .foldLeft(emptyIds)((ds, entity) => { val entityPath = graphBasePath + '/' + entity if (HdfsSupport.exists(entityPath, spark.sparkContext.hadoopConfiguration)) { - ds.union(spark.read.schema("`id` STRING").json(entityPath)) + ds.union(spark.read.schema(idsSchema).json(entityPath)) } else { ds } }) + .filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true") + .select("id") .distinct() val relations = spark.read.schema(Encoders.bean(classOf[Relation]).schema).json(inputPath) - .filter(col("dataInfo.deletedbyinference").isNull || col("dataInfo.deletedbyinference") === false) + .filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true") AbstractSparkAction.save( relations