diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala index f13c14da5..7c3a212ac 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala @@ -7,6 +7,7 @@ import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} +import scala.collection.JavaConverters._ object SparkConvertRDDtoDataset { @@ -94,21 +95,29 @@ object SparkConvertRDDtoDataset { log.info("Converting Relation") val relationSemanticFilter = List( - "cites", - "iscitedby", +// "cites", +// "iscitedby", "merges", "ismergedin", "HasAmongTopNSimilarDocuments", "IsAmongTopNSimilarDocuments" ) + val rddRelation = spark.sparkContext .textFile(s"$sourcePath/relation") .map(s => mapper.readValue(s, classOf[Relation])) .filter(r => r.getDataInfo != null && r.getDataInfo.getDeletedbyinference == false) .filter(r => r.getSource.startsWith("50") && r.getTarget.startsWith("50")) + //filter OpenCitations relations + .filter(r => r.getCollectedfrom!= null && r.getCollectedfrom.size()>0 && !r.getCollectedfrom.asScala.exists(k => "opencitations".equalsIgnoreCase(k.getValue))) .filter(r => !relationSemanticFilter.exists(k => k.equalsIgnoreCase(r.getRelClass))) spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath") + + + + + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/pangaea/PangaeaUtils.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/pangaea/PangaeaUtils.scala deleted file mode 100644 index 23f4da6c7..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/pangaea/PangaeaUtils.scala +++ /dev/null @@ -1,136 +0,0 @@ -package eu.dnetlib.dhp.sx.graph.pangaea - -import org.apache.spark.sql.expressions.Aggregator -import org.apache.spark.sql.{Encoder, Encoders} -import org.json4s -import org.json4s.DefaultFormats -import org.json4s.jackson.JsonMethods.parse - -import java.util.regex.Pattern -import scala.language.postfixOps -import scala.xml.{Elem, Node, XML} - -case class PangaeaDataModel( - identifier: String, - title: List[String], - objectType: List[String], - creator: List[String], - publisher: List[String], - dataCenter: List[String], - subject: List[String], - language: String, - rights: String, - parent: String, - relation: List[String], - linkage: List[(String, String)] -) {} - -object PangaeaUtils { - - def toDataset(input: String): PangaeaDataModel = { - implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - lazy val json: json4s.JValue = parse(input) - val xml = (json \ "xml").extract[String] - parseXml(xml) - } - - def findDOIInRelation(input: List[String]): List[String] = { - val pattern = Pattern.compile("\\b(10[.][0-9]{4,}(?:[.][0-9]+)*\\/(?:(?![\"&\\'<>])\\S)+)\\b") - input - .map(i => { - val matcher = pattern.matcher(i) - if (matcher.find()) - matcher.group(0) - else - null - }) - .filter(i => i != null) - } - - def attributeOpt(attribute: String, node: Node): Option[String] = - node.attribute(attribute) flatMap (_.headOption) map (_.text) - - def extractLinkage(node: Elem): List[(String, String)] = { - (node \ "linkage") - .map(n => (attributeOpt("type", n), n.text)) - .filter(t => t._1.isDefined) - .map(t => (t._1.get, t._2))(collection.breakOut) - } - - def parseXml(input: String): PangaeaDataModel = { - val xml = XML.loadString(input) - - val identifier = (xml \ "identifier").text - val title: List[String] = (xml \ "title").map(n => n.text)(collection.breakOut) - val pType: List[String] = (xml \ "type").map(n => n.text)(collection.breakOut) - val creators: List[String] = (xml \ "creator").map(n => n.text)(collection.breakOut) - val publisher: List[String] = (xml \ "publisher").map(n => n.text)(collection.breakOut) - val dataCenter: List[String] = (xml \ "dataCenter").map(n => n.text)(collection.breakOut) - val subject: List[String] = (xml \ "subject").map(n => n.text)(collection.breakOut) - val language = (xml \ "language").text - val rights = (xml \ "rights").text - val parentIdentifier = (xml \ "parentIdentifier").text - val relation: List[String] = (xml \ "relation").map(n => n.text)(collection.breakOut) - val relationFiltered = findDOIInRelation(relation) - val linkage: List[(String, String)] = extractLinkage(xml) - - PangaeaDataModel( - identifier, - title, - pType, - creators, - publisher, - dataCenter, - subject, - language, - rights, - parentIdentifier, - relationFiltered, - linkage - ) - } - - def getDatasetAggregator(): Aggregator[(String, PangaeaDataModel), PangaeaDataModel, PangaeaDataModel] = - new Aggregator[(String, PangaeaDataModel), PangaeaDataModel, PangaeaDataModel] { - - override def zero: PangaeaDataModel = null - - override def reduce(b: PangaeaDataModel, a: (String, PangaeaDataModel)): PangaeaDataModel = { - if (b == null) - a._2 - else { - if (a == null) - b - else { - if (b.title != null && b.title.nonEmpty) - b - else - a._2 - - } - } - } - - override def merge(b1: PangaeaDataModel, b2: PangaeaDataModel): PangaeaDataModel = { - if (b1 == null) - b2 - else { - if (b2 == null) - b1 - else { - if (b1.title != null && b1.title.nonEmpty) - b1 - else - b2 - - } - } - } - override def finish(reduction: PangaeaDataModel): PangaeaDataModel = reduction - - override def bufferEncoder: Encoder[PangaeaDataModel] = Encoders.kryo[PangaeaDataModel] - - override def outputEncoder: Encoder[PangaeaDataModel] = Encoders.kryo[PangaeaDataModel] - } - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/pangaea/SparkGeneratePanagaeaDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/pangaea/SparkGeneratePanagaeaDataset.scala deleted file mode 100644 index 8ff8a8b1a..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/pangaea/SparkGeneratePanagaeaDataset.scala +++ /dev/null @@ -1,58 +0,0 @@ -package eu.dnetlib.dhp.sx.graph.pangaea - -import eu.dnetlib.dhp.application.ArgumentApplicationParser -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} -import org.apache.spark.{SparkConf, SparkContext} -import org.slf4j.{Logger, LoggerFactory} -import scala.collection.JavaConverters._ - -import scala.io.Source - -object SparkGeneratePanagaeaDataset { - - def main(args: Array[String]): Unit = { - val logger: Logger = LoggerFactory.getLogger(getClass) - val conf: SparkConf = new SparkConf() - val parser = new ArgumentApplicationParser( - Source - .fromInputStream( - getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/pangaea/pangaea_to_dataset.json") - ) - .mkString - ) - parser.parseArgument(args) - - val spark: SparkSession = - SparkSession - .builder() - .config(conf) - .appName(SparkGeneratePanagaeaDataset.getClass.getSimpleName) - .master(parser.get("master")) - .getOrCreate() - - parser.getObjectMap.asScala.foreach(s => logger.info(s"${s._1} -> ${s._2}")) - logger.info("Converting sequential file into Dataset") - val sc: SparkContext = spark.sparkContext - - val workingPath: String = parser.get("workingPath") - - implicit val pangaeaEncoders: Encoder[PangaeaDataModel] = Encoders.kryo[PangaeaDataModel] - - val inputRDD: RDD[PangaeaDataModel] = - sc.textFile(s"$workingPath/update").map(s => PangaeaUtils.toDataset(s)) - - spark - .createDataset(inputRDD) - .as[PangaeaDataModel] - .map(s => (s.identifier, s))(Encoders.tuple(Encoders.STRING, pangaeaEncoders)) - .groupByKey(_._1)(Encoders.STRING) - .agg(PangaeaUtils.getDatasetAggregator().toColumn) - .map(s => s._2) - .write - .mode(SaveMode.Overwrite) - .save(s"$workingPath/dataset") - - } - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/pangaea/PangaeaTransformTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/pangaea/PangaeaTransformTest.scala deleted file mode 100644 index b90827e81..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/sx/pangaea/PangaeaTransformTest.scala +++ /dev/null @@ -1,29 +0,0 @@ -package eu.dnetlib.dhp.sx.pangaea - -import eu.dnetlib.dhp.sx.graph.pangaea.PangaeaUtils -import org.junit.jupiter.api.Test - -import java.util.TimeZone -import java.text.SimpleDateFormat -import java.util.Date -import scala.io.Source -class PangaeaTransformTest { - - - - @Test - def test_dateStamp() :Unit ={ - - - - val d = new Date() - - val s:String = s"${new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")format d}Z" - println(s) - - - val xml = Source.fromInputStream(getClass.getResourceAsStream("input.xml")).mkString - println(PangaeaUtils.parseXml(xml)) - } - -}