added better filter for openCitations

This commit is contained in:
Sandro La Bruzzo 2022-05-11 15:29:57 +02:00
parent 77bc9863e9
commit ca8d26bcb4
4 changed files with 11 additions and 225 deletions

View File

@ -7,6 +7,7 @@ import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
object SparkConvertRDDtoDataset {
@ -94,21 +95,29 @@ object SparkConvertRDDtoDataset {
log.info("Converting Relation")
val relationSemanticFilter = List(
"cites",
"iscitedby",
// "cites",
// "iscitedby",
"merges",
"ismergedin",
"HasAmongTopNSimilarDocuments",
"IsAmongTopNSimilarDocuments"
)
val rddRelation = spark.sparkContext
.textFile(s"$sourcePath/relation")
.map(s => mapper.readValue(s, classOf[Relation]))
.filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false)
.filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50"))
//filter OpenCitations relations
.filter(r => r.getCollectedfrom!= null && r.getCollectedfrom.size()>0 && !r.getCollectedfrom.asScala.exists(k => "opencitations".equalsIgnoreCase(k.getValue)))
.filter(r => !relationSemanticFilter.exists(k => k.equalsIgnoreCase(r.getRelClass)))
spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath")
}
}

View File

@ -1,136 +0,0 @@
package eu.dnetlib.dhp.sx.graph.pangaea
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
import java.util.regex.Pattern
import scala.language.postfixOps
import scala.xml.{Elem, Node, XML}
case class PangaeaDataModel(
identifier: String,
title: List[String],
objectType: List[String],
creator: List[String],
publisher: List[String],
dataCenter: List[String],
subject: List[String],
language: String,
rights: String,
parent: String,
relation: List[String],
linkage: List[(String, String)]
) {}
object PangaeaUtils {
def toDataset(input: String): PangaeaDataModel = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input)
val xml = (json \ "xml").extract[String]
parseXml(xml)
}
def findDOIInRelation(input: List[String]): List[String] = {
val pattern = Pattern.compile("\\b(10[.][0-9]{4,}(?:[.][0-9]+)*\\/(?:(?![\"&\\'<>])\\S)+)\\b")
input
.map(i => {
val matcher = pattern.matcher(i)
if (matcher.find())
matcher.group(0)
else
null
})
.filter(i => i != null)
}
def attributeOpt(attribute: String, node: Node): Option[String] =
node.attribute(attribute) flatMap (_.headOption) map (_.text)
def extractLinkage(node: Elem): List[(String, String)] = {
(node \ "linkage")
.map(n => (attributeOpt("type", n), n.text))
.filter(t => t._1.isDefined)
.map(t => (t._1.get, t._2))(collection.breakOut)
}
def parseXml(input: String): PangaeaDataModel = {
val xml = XML.loadString(input)
val identifier = (xml \ "identifier").text
val title: List[String] = (xml \ "title").map(n => n.text)(collection.breakOut)
val pType: List[String] = (xml \ "type").map(n => n.text)(collection.breakOut)
val creators: List[String] = (xml \ "creator").map(n => n.text)(collection.breakOut)
val publisher: List[String] = (xml \ "publisher").map(n => n.text)(collection.breakOut)
val dataCenter: List[String] = (xml \ "dataCenter").map(n => n.text)(collection.breakOut)
val subject: List[String] = (xml \ "subject").map(n => n.text)(collection.breakOut)
val language = (xml \ "language").text
val rights = (xml \ "rights").text
val parentIdentifier = (xml \ "parentIdentifier").text
val relation: List[String] = (xml \ "relation").map(n => n.text)(collection.breakOut)
val relationFiltered = findDOIInRelation(relation)
val linkage: List[(String, String)] = extractLinkage(xml)
PangaeaDataModel(
identifier,
title,
pType,
creators,
publisher,
dataCenter,
subject,
language,
rights,
parentIdentifier,
relationFiltered,
linkage
)
}
def getDatasetAggregator(): Aggregator[(String, PangaeaDataModel), PangaeaDataModel, PangaeaDataModel] =
new Aggregator[(String, PangaeaDataModel), PangaeaDataModel, PangaeaDataModel] {
override def zero: PangaeaDataModel = null
override def reduce(b: PangaeaDataModel, a: (String, PangaeaDataModel)): PangaeaDataModel = {
if (b == null)
a._2
else {
if (a == null)
b
else {
if (b.title != null && b.title.nonEmpty)
b
else
a._2
}
}
}
override def merge(b1: PangaeaDataModel, b2: PangaeaDataModel): PangaeaDataModel = {
if (b1 == null)
b2
else {
if (b2 == null)
b1
else {
if (b1.title != null && b1.title.nonEmpty)
b1
else
b2
}
}
}
override def finish(reduction: PangaeaDataModel): PangaeaDataModel = reduction
override def bufferEncoder: Encoder[PangaeaDataModel] = Encoders.kryo[PangaeaDataModel]
override def outputEncoder: Encoder[PangaeaDataModel] = Encoders.kryo[PangaeaDataModel]
}
}

View File

@ -1,58 +0,0 @@
package eu.dnetlib.dhp.sx.graph.pangaea
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
import scala.io.Source
object SparkGeneratePanagaeaDataset {
def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(
Source
.fromInputStream(
getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/pangaea/pangaea_to_dataset.json")
)
.mkString
)
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(SparkGeneratePanagaeaDataset.getClass.getSimpleName)
.master(parser.get("master"))
.getOrCreate()
parser.getObjectMap.asScala.foreach(s => logger.info(s"${s._1} -> ${s._2}"))
logger.info("Converting sequential file into Dataset")
val sc: SparkContext = spark.sparkContext
val workingPath: String = parser.get("workingPath")
implicit val pangaeaEncoders: Encoder[PangaeaDataModel] = Encoders.kryo[PangaeaDataModel]
val inputRDD: RDD[PangaeaDataModel] =
sc.textFile(s"$workingPath/update").map(s => PangaeaUtils.toDataset(s))
spark
.createDataset(inputRDD)
.as[PangaeaDataModel]
.map(s => (s.identifier, s))(Encoders.tuple(Encoders.STRING, pangaeaEncoders))
.groupByKey(_._1)(Encoders.STRING)
.agg(PangaeaUtils.getDatasetAggregator().toColumn)
.map(s => s._2)
.write
.mode(SaveMode.Overwrite)
.save(s"$workingPath/dataset")
}
}

View File

@ -1,29 +0,0 @@
package eu.dnetlib.dhp.sx.pangaea
import eu.dnetlib.dhp.sx.graph.pangaea.PangaeaUtils
import org.junit.jupiter.api.Test
import java.util.TimeZone
import java.text.SimpleDateFormat
import java.util.Date
import scala.io.Source
class PangaeaTransformTest {
@Test
def test_dateStamp() :Unit ={
val d = new Date()
val s:String = s"${new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")format d}Z"
println(s)
val xml = Source.fromInputStream(getClass.getResourceAsStream("input.xml")).mkString
println(PangaeaUtils.parseXml(xml))
}
}