|
|
|
@ -2,6 +2,7 @@ package eu.dnetlib.dhp.sx.graph
|
|
|
|
|
|
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper
|
|
|
|
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
|
|
|
|
import eu.dnetlib.dhp.schema.common.ModelConstants
|
|
|
|
|
import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset}
|
|
|
|
|
import org.apache.commons.io.IOUtils
|
|
|
|
|
import org.apache.commons.lang3.StringUtils
|
|
|
|
@ -99,44 +100,32 @@ object SparkConvertRDDtoDataset {
|
|
|
|
|
|
|
|
|
|
log.info("Converting Relation")
|
|
|
|
|
|
|
|
|
|
if (filterRelation != null && StringUtils.isNoneBlank(filterRelation)) {
|
|
|
|
|
|
|
|
|
|
val rddRelation = spark.sparkContext
|
|
|
|
|
.textFile(s"$sourcePath/relation")
|
|
|
|
|
.map(s => mapper.readValue(s, classOf[Relation]))
|
|
|
|
|
.filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false)
|
|
|
|
|
.filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50"))
|
|
|
|
|
//filter OpenCitations relations
|
|
|
|
|
.filter(r =>
|
|
|
|
|
r.getCollectedfrom != null && r.getCollectedfrom.size() > 0 && !r.getCollectedfrom.asScala.exists(k =>
|
|
|
|
|
"opencitations".equalsIgnoreCase(k.getValue)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
.filter(r => r.getSubRelType != null && r.getSubRelType.equalsIgnoreCase(filterRelation))
|
|
|
|
|
spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath")
|
|
|
|
|
} else {
|
|
|
|
|
val relationSemanticFilter = List(
|
|
|
|
|
ModelConstants.MERGES,
|
|
|
|
|
ModelConstants.IS_MERGED_IN,
|
|
|
|
|
ModelConstants.HAS_AMONG_TOP_N_SIMILAR_DOCS,
|
|
|
|
|
ModelConstants.IS_AMONG_TOP_N_SIMILAR_DOCS
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
val relationSemanticFilter = List(
|
|
|
|
|
"merges",
|
|
|
|
|
"ismergedin",
|
|
|
|
|
"HasAmongTopNSimilarDocuments",
|
|
|
|
|
"IsAmongTopNSimilarDocuments"
|
|
|
|
|
val rddRelation = spark.sparkContext
|
|
|
|
|
.textFile(s"$sourcePath/relation")
|
|
|
|
|
.map(s => mapper.readValue(s, classOf[Relation]))
|
|
|
|
|
.filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false)
|
|
|
|
|
.filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50"))
|
|
|
|
|
//filter OpenCitations relations
|
|
|
|
|
.filter(r =>
|
|
|
|
|
r.getDataInfo.getProvenanceaction != null &&
|
|
|
|
|
!"sysimport:crosswalk:opencitations".equals(r.getDataInfo.getProvenanceaction.getClassid)
|
|
|
|
|
)
|
|
|
|
|
.filter(r => filterRelations(filterRelation, relationSemanticFilter, r))
|
|
|
|
|
spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val rddRelation = spark.sparkContext
|
|
|
|
|
.textFile(s"$sourcePath/relation")
|
|
|
|
|
.map(s => mapper.readValue(s, classOf[Relation]))
|
|
|
|
|
.filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false)
|
|
|
|
|
.filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50"))
|
|
|
|
|
//filter OpenCitations relations
|
|
|
|
|
.filter(r =>
|
|
|
|
|
r.getCollectedfrom != null && r.getCollectedfrom.size() > 0 && !r.getCollectedfrom.asScala.exists(k =>
|
|
|
|
|
"opencitations".equalsIgnoreCase(k.getValue)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
.filter(r => !relationSemanticFilter.exists(k => k.equalsIgnoreCase(r.getRelClass)))
|
|
|
|
|
spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath")
|
|
|
|
|
private def filterRelations(filterRelation: String, relationSemanticFilter: List[String], r: Relation): Boolean = {
|
|
|
|
|
if (filterRelation != null && StringUtils.isNoneBlank(filterRelation)) {
|
|
|
|
|
r.getSubRelType != null && r.getSubRelType.equalsIgnoreCase(filterRelation)
|
|
|
|
|
} else {
|
|
|
|
|
!relationSemanticFilter.exists(k => k.equalsIgnoreCase(r.getRelClass))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|