diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala index 6ee575e2a8..11d026f025 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala @@ -4,7 +4,12 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.{Relation, Result} import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD import org.apache.spark.sql._ +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.JsonAST.{JField, JObject, JString} +import org.json4s.jackson.JsonMethods.parse import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ @@ -29,23 +34,11 @@ object SparkResolveRelation { val workingPath = parser.get("workingPath") log.info(s"workingPath -> $workingPath") - - implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation]) import spark.implicits._ - val entities:Dataset[Result] = spark.read.load(s"$entityPath/*").as[Result] - entities.flatMap(e => e.getPid.asScala - .map(p => - convertPidToDNETIdentifier(p.getValue, p.getQualifier.getClassid)) - .filter(s => s!= null) - .map(s => (s,e.getId)) - ).groupByKey(_._1) - .reduceGroups((x,y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y) - .map(s =>s._2) - .write - .mode(SaveMode.Overwrite) - .save(s"$workingPath/resolvedPid") + + extractPidResolvedTableFromJsonRDD(spark, entityPath, workingPath) val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/resolvedPid").as[(String,String)] @@ -74,11 +67,65 @@ object SparkResolveRelation { }.filter(r => r.getSource.startsWith("50")&& r.getTarget.startsWith("50")) .write .mode(SaveMode.Overwrite) - .save(s"$workingPath/resolvedRelation") + .save(s"$workingPath/relation") } + private def extractPidsFromRecord(input:String):(String,List[(String,String)]) = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + val id:String = (json \ "id").extract[String] + val result: List[(String,String)] = for { + JObject(pids) <- json \ "pid" + JField("value", JString(pidValue)) <- pids + JField("qualifier", JObject(qualifier)) <- pids + JField("classname", JString(pidType)) <- qualifier + } yield (pidValue, pidType) + (id,result) + } + private def extractPidResolvedTableFromJsonRDD(spark: SparkSession, entityPath: String, workingPath: String) = { + import spark.implicits._ + + val d: RDD[(String,String)] = spark.sparkContext.textFile(s"$entityPath/*") + .map(i => extractPidsFromRecord(i)) + .filter(s => s != null && s._2!=null && s._2.nonEmpty) + .flatMap{ p => + p._2.map(pid => + (p._1,convertPidToDNETIdentifier(pid._1, pid._2)) + ) + } + + spark.createDataset(d) + .groupByKey(_._1) + .reduceGroups((x, y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y) + .map(s => s._2) + .write + .mode(SaveMode.Overwrite) + .save(s"$workingPath/resolvedPid") + } + + + /* + This method should be used once we finally convert everythings in Kryo dataset + instead of using rdd of json + */ + private def extractPidResolvedTableFromKryo(spark: SparkSession, entityPath: String, workingPath: String) = { + import spark.implicits._ + implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) + val entities: Dataset[Result] = spark.read.load(s"$entityPath/*").as[Result] + entities.flatMap(e => e.getPid.asScala + .map(p => + convertPidToDNETIdentifier(p.getValue, p.getQualifier.getClassid)) + .filter(s => s != null) + .map(s => (s, e.getId)) + ).groupByKey(_._1) + .reduceGroups((x, y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y) + .map(s => s._2) + .write + .mode(SaveMode.Overwrite) + .save(s"$workingPath/resolvedPid") + } def convertPidToDNETIdentifier(pid:String, pidType: String):String = { if (pid==null || pid.isEmpty || pidType== null || pidType.isEmpty)