diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala index 0073afff5..a54acf1d3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala @@ -2,6 +2,7 @@ package eu.dnetlib.dhp.sx.graph import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset} import org.apache.commons.io.IOUtils import org.apache.commons.lang3.StringUtils @@ -99,44 +100,32 @@ object SparkConvertRDDtoDataset { log.info("Converting Relation") - if (filterRelation != null && StringUtils.isNoneBlank(filterRelation)) { + val relationSemanticFilter = List( + ModelConstants.MERGES, + ModelConstants.IS_MERGED_IN, + ModelConstants.HAS_AMONG_TOP_N_SIMILAR_DOCS, + ModelConstants.IS_AMONG_TOP_N_SIMILAR_DOCS + ) - val rddRelation = spark.sparkContext - .textFile(s"$sourcePath/relation") - .map(s => mapper.readValue(s, classOf[Relation])) - .filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false) - .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50")) - //filter OpenCitations relations - .filter(r => - r.getCollectedfrom != null && r.getCollectedfrom.size() > 0 && !r.getCollectedfrom.asScala.exists(k => - "opencitations".equalsIgnoreCase(k.getValue) - ) - ) - .filter(r => r.getSubRelType != null && r.getSubRelType.equalsIgnoreCase(filterRelation)) - spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath") - } else { - - val relationSemanticFilter = List( - "merges", - "ismergedin", - "HasAmongTopNSimilarDocuments", - "IsAmongTopNSimilarDocuments" + val rddRelation = spark.sparkContext + .textFile(s"$sourcePath/relation") + .map(s => mapper.readValue(s, classOf[Relation])) + .filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false) + .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50")) + //filter OpenCitations relations + .filter(r => + r.getDataInfo.getProvenanceaction != null && + !"sysimport:crosswalk:opencitations".equals(r.getDataInfo.getProvenanceaction.getClassid) ) + .filter(r => filterRelations(filterRelation, relationSemanticFilter, r)) + spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath") + } - val rddRelation = spark.sparkContext - .textFile(s"$sourcePath/relation") - .map(s => mapper.readValue(s, classOf[Relation])) - .filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false) - .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50")) - //filter OpenCitations relations - .filter(r => - r.getCollectedfrom != null && r.getCollectedfrom.size() > 0 && !r.getCollectedfrom.asScala.exists(k => - "opencitations".equalsIgnoreCase(k.getValue) - ) - ) - .filter(r => !relationSemanticFilter.exists(k => k.equalsIgnoreCase(r.getRelClass))) - spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath") + private def filterRelations(filterRelation: String, relationSemanticFilter: List[String], r: Relation): Boolean = { + if (filterRelation != null && StringUtils.isNoneBlank(filterRelation)) { + r.getSubRelType != null && r.getSubRelType.equalsIgnoreCase(filterRelation) + } else { + !relationSemanticFilter.exists(k => k.equalsIgnoreCase(r.getRelClass)) } - } }