DHP-Explorer/src/main/java/eu/dnetlib/doiboost/mag/SparkMagCitation.scala

62 lines
3.0 KiB
Scala

package eu.dnetlib.doiboost.mag
import com.fasterxml.jackson.databind.ObjectMapper
import com.sandro.app.AbstractScalaApplication
import eu.dnetlib.dhp.schema.oaf.Relation
import eu.dnetlib.doiboost.mag.MagUtility.{MagPaperCitation, MagPapers, normalizeDoi}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
class SparkMagCitation ( args: Array[String], log: Logger) extends AbstractScalaApplication( args: Array[String], log: Logger) {
def extractCitationRelationDOI(spark: SparkSession, magBasePath: String, workingDir: String) = {
import spark.implicits._
val papersDs: Dataset[(Long, String)] = spark.read.load(s"$magBasePath/Papers").select("PaperId", "Doi").where(col("PaperId").isNotNull).where(col("Doi").isNotNull).as[(Long, String)]
val citationDS: Dataset[(Long, Long)] = spark.read.load(s"$magBasePath/PaperCitationContexts").select("PaperId", "PaperReferenceId").where(col("PaperId").isNotNull).where(col("PaperReferenceId").isNotNull).as[(Long, Long)]
val DOI_ID = papersDs.map(s => (s._1, normalizeDoi(s._2).toLowerCase.trim)).filter(s => s._2 != null)
citationDS.joinWith(DOI_ID, citationDS("PaperId").equalTo(DOI_ID("_1"))).map(s => (s._2._2, s._1._2)).as[(String, Long)].write.mode(SaveMode.Overwrite).save(s"$workingDir/citation_one_side")
val oneSideRelationDs = spark.read.load(s"$workingDir/citation_one_side").as[(String, Long)]
oneSideRelationDs.joinWith(DOI_ID, oneSideRelationDs("_2").equalTo(DOI_ID("_1")), "inner").map(s => (s._1._1, s._2._2)).distinct().write.mode(SaveMode.Overwrite).save(s"$workingDir/citation_mag_doi_doi")
}
def createRelations(spark:SparkSession, workingDir:String) :Unit = {
val mapper = new ObjectMapper()
import spark.implicits._
implicit val resultEncoder:Encoder[Relation] = Encoders.kryo[Relation]
val ctM = spark.read.load(s"$workingDir/citation_mag_doi_doi").as[(String, String)]
ctM.flatMap(t => MagUtility.createCiteRealtion(t._1, t._2)).as[Relation].map(m => mapper.writeValueAsString(m)).write.mode(SaveMode.Overwrite)
.option("compression", "gzip").text(s"$workingDir/relations")
}
def checkRelation(spark:SparkSession, workingDir:String) :Unit = {
import spark.implicits._
spark.read.text(s"$workingDir/relations").as[String].flatMap(s => MagUtility.extractST(s)).distinct().write.mode(SaveMode.Overwrite).save(s"$workingDir/distinctID")
}
/** Here all the spark applications runs this method
* where the whole logic of the spark node is defined
*/
override def run(): Unit = {
//extractCitationRelationDOI(spark,"/data/doiboost/input/mag/dataset", "/user/sandro.labruzzo/mag")
//createRelations(spark,"/user/sandro.labruzzo/mag")
checkRelation(spark,"/user/sandro.labruzzo/mag")
}
}
object SparkMagCitation {
val log:Logger = LoggerFactory.getLogger(getClass)
def main(args: Array[String]): Unit = {
new SparkMagCitation(args,log).initialize().run()
}
}