change implementation of resolve Relation to generate jsonRdd in output

This commit is contained in:
Sandro La Bruzzo 2021-07-25 09:51:24 +02:00
parent a0393607a7
commit 3920c69bc8
2 changed files with 14 additions and 4 deletions

View File

@ -38,7 +38,8 @@ import scala.Tuple2;
/** /**
* Groups the graph content by entity identifier to ensure ID uniqueness * Groups the graph content by entity identifier to ensure ID uniqueness
*/ */
public class GroupEntitiesSparkJob { public class
GroupEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);

View File

@ -1,8 +1,10 @@
package eu.dnetlib.dhp.sx.graph package eu.dnetlib.dhp.sx.graph
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Relation, Result} import eu.dnetlib.dhp.schema.oaf.{Relation, Result}
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql._ import org.apache.spark.sql._
@ -40,7 +42,9 @@ object SparkResolveRelation {
extractPidResolvedTableFromJsonRDD(spark, entityPath, workingPath) extractPidResolvedTableFromJsonRDD(spark, entityPath, workingPath)
val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/resolvedPid").as[(String,String)] val mappper = new ObjectMapper()
val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String,String)]
val relationDs:Dataset[(String,Relation)] = spark.read.load(relationPath).as[Relation].map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) val relationDs:Dataset[(String,Relation)] = spark.read.load(relationPath).as[Relation].map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
@ -68,6 +72,11 @@ object SparkResolveRelation {
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.save(s"$workingPath/relation_resolved") .save(s"$workingPath/relation_resolved")
spark.read.load(s"$workingPath/relation_resolved").as[Relation]
.map(r => mappper.writeValueAsString(r))
.rdd.saveAsTextFile(s"$workingPath/relation", classOf[GzipCodec])
} }
@ -102,7 +111,7 @@ object SparkResolveRelation {
.map(s => s._2) .map(s => s._2)
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.save(s"$workingPath/resolvedPid") .save(s"$workingPath/relationResolvedPid")
} }
@ -124,7 +133,7 @@ object SparkResolveRelation {
.map(s => s._2) .map(s => s._2)
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.save(s"$workingPath/resolvedPid") .save(s"$workingPath/relationResolvedPid")
} }
def convertPidToDNETIdentifier(pid:String, pidType: String):String = { def convertPidToDNETIdentifier(pid:String, pidType: String):String = {