diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala index 556106180..362cb2028 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala @@ -116,54 +116,45 @@ object SparkConvertRDDtoDataset { .map(s => mapper.readValue(s, classOf[Relation])) .filter(r => r.getDataInfo != null && !r.getDataInfo.getDeletedbyinference) .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50")) - .filter(r => filterRelations(subRelTypeFilter, relClassFilter, r)) - //filter OpenCitations relations - .filter(r => - r.getDataInfo.getProvenanceaction != null && - !"sysimport:crosswalk:opencitations".equals(r.getDataInfo.getProvenanceaction.getClassid) - ) + .filter(r => filterRelations(r)) + //filter OpenCitations relations +// .filter(r => +// r.getDataInfo.getProvenanceaction != null && +// !"sysimport:crosswalk:opencitations".equals(r.getDataInfo.getProvenanceaction.getClassid) +// ) spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath") } - private def filterRelations(subRelTypeFilter: String, relClassFilter: List[String], r: Relation): Boolean = { - if (StringUtils.isNotBlank(subRelTypeFilter)) { - subRelTypeFilter.equalsIgnoreCase(r.getSubRelType) - } else { - !relClassFilter.exists(k => k.equalsIgnoreCase(r.getRelClass)) + private def filterRelations(r: Relation): Boolean = { + + /** * + * We filter relation generated by dedups + * and all the relation that have one single collectedFrom OpenCitation + */ + + val relClassFilter = List( + ModelConstants.MERGES, + ModelConstants.IS_MERGED_IN, + ModelConstants.HAS_AMONG_TOP_N_SIMILAR_DOCS, + ModelConstants.IS_AMONG_TOP_N_SIMILAR_DOCS + ) + if (relClassFilter.exists(k => k.equalsIgnoreCase(r.getRelClass))) + false + else { + if (r.getCollectedfrom == null || r.getCollectedfrom.size() == 0) + false + else if (r.getCollectedfrom.size() > 1) + true + else if ( + r.getCollectedfrom.size() == 1 && r.getCollectedfrom.get(0) != null && "OpenCitations".equalsIgnoreCase( + r.getCollectedfrom.get(0).getValue + ) + ) + false + else + true } } - /* - //TODO: finalise implementation - private def processResult[T<: Result]( - implicit ct: ClassTag[T], - log: Logger, - spark: SparkSession, - sourcePath: String, - entityPath: String, - clazz: Class[T] - ): Unit = { - val entityType = clazz.getSimpleName.toLowerCase - - log.info(s"Converting $entityType") - - val mapper = new ObjectMapper() with ScalaObjectMapper - mapper.registerModule(DefaultScalaModule) - - val rdd = spark.sparkContext - .textFile(s"$sourcePath/$entityType") - .map(s => mapper.readValue(s, clazz)) - .filter(r => r.getDataInfo != null && !r.getDataInfo.getDeletedbyinference); - - implicit val encoder: Encoder[T] = Encoders.kryo(clazz) - spark - .createDataset(rdd) - .as[T] - .write - .mode(SaveMode.Overwrite) - .save(s"$entityPath/$entityType") - } - */ - }