dnet-hadoop/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala

126 lines
6.9 KiB
Scala
Raw Normal View History

package eu.dnetlib.doiboost
import eu.dnetlib.dhp.application.ArgumentApplicationParser
2020-05-26 09:15:33 +02:00
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.doiboost.mag.ConversionUtil
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
2020-05-26 09:15:33 +02:00
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
2020-05-26 09:15:33 +02:00
import scala.collection.JavaConverters._
object SparkGenerateDoiBoost {
def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/generate_doiboost_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
2020-05-26 09:15:33 +02:00
import spark.implicits._
2020-05-22 20:51:42 +02:00
val crossrefPublicationPath = parser.get("crossrefPublicationPath")
val crossrefDatasetPath = parser.get("crossrefDatasetPath")
val uwPublicationPath = parser.get("uwPublicationPath")
val magPublicationPath = parser.get("magPublicationPath")
val orcidPublicationPath = parser.get("orcidPublicationPath")
val workingDirPath = parser.get("workingDirPath")
2020-05-29 09:32:04 +02:00
// logger.info("Phase 1) repartition and move all the dataset in a same working folder")
// spark.read.load(crossrefPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefPublication")
// spark.read.load(crossrefDatasetPath).as(Encoders.bean(classOf[OafDataset])).map(s => s)(Encoders.kryo[OafDataset]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefDataset")
// spark.read.load(uwPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/uwPublication")
// spark.read.load(orcidPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/orcidPublication")
// spark.read.load(magPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/magPublication")
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPub)
2020-05-26 09:15:33 +02:00
implicit val mapEncoderRel: Encoder[Relation] = Encoders.kryo[Relation]
logger.info("Phase 2) Join Crossref with UnpayWall")
2020-05-22 20:51:42 +02:00
val crossrefPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/crossrefPublication").as[Publication].map(p => (p.getId, p))
val uwPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/uwPublication").as[Publication].map(p => (p.getId, p))
2020-05-22 20:51:42 +02:00
def applyMerge(item:((String, Publication), (String, Publication))) : Publication =
{
val crossrefPub = item._1._2
2020-05-22 20:51:42 +02:00
if (item._2!= null) {
val otherPub = item._2._2
if (otherPub != null) {
crossrefPub.mergeFrom(otherPub)
}
}
crossrefPub
2020-05-22 20:51:42 +02:00
}
crossrefPublication.joinWith(uwPublication, crossrefPublication("_1").equalTo(uwPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/firstJoin")
logger.info("Phase 3) Join Result with ORCID")
val fj: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/firstJoin").as[Publication].map(p => (p.getId, p))
val orcidPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/orcidPublication").as[Publication].map(p => (p.getId, p))
fj.joinWith(orcidPublication, fj("_1").equalTo(orcidPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/secondJoin")
logger.info("Phase 3) Join Result with MAG")
val sj: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/secondJoin").as[Publication].map(p => (p.getId, p))
val magPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/magPublication").as[Publication].map(p => (p.getId, p))
sj.joinWith(magPublication, sj("_1").equalTo(magPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublication")
2020-05-23 08:46:49 +02:00
val doiBoostPublication: Dataset[Publication] = spark.read.load(s"$workingDirPath/doiBoostPublication").as[Publication]
2020-05-22 20:51:42 +02:00
2020-05-26 09:15:33 +02:00
val map = DoiBoostMappingUtil.retrieveHostedByMap()
doiBoostPublication.filter(p=>DoiBoostMappingUtil.filterPublication(p)).map(p => DoiBoostMappingUtil.fixPublication(p, map)).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationFiltered")
val affiliationPath = parser.get("affiliationPath")
val paperAffiliationPath = parser.get("paperAffiliationPath")
val affiliation = spark.read.load(affiliationPath).where(col("GridId").isNotNull).select(col("AffiliationId"), col("GridId"))
val paperAffiliation = spark.read.load(paperAffiliationPath).select(col("AffiliationId").alias("affId"), col("PaperId"))
val a:Dataset[DoiBoostAffiliation] = paperAffiliation
.joinWith(affiliation, paperAffiliation("affId").equalTo(affiliation("AffiliationId"))).select(col("_1.PaperId"), col("_2.AffiliationId"), col("_2.GridId")).as[DoiBoostAffiliation]
2020-05-22 20:51:42 +02:00
2020-05-26 09:15:33 +02:00
val magPubs:Dataset[(String,Publication)]= spark.read.load(s"$workingDirPath/doiBoostPublicationFiltered").as[Publication]
.map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p))(tupleForJoinEncoder).filter(s =>s._1!= null )
2020-05-26 09:15:33 +02:00
magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).flatMap(item => {
val pub:Publication = item._1._2
val affiliation = item._2
val r:Relation = new Relation
r.setSource(pub.getId)
r.setTarget(DoiBoostMappingUtil.generateGridAffiliationId(affiliation.GridId))
r.setRelType("resultOrganization")
r.setRelClass("hasAuthorInstitution")
r.setSubRelType("affiliation")
r.setDataInfo(pub.getDataInfo)
r.setCollectedfrom(pub.getCollectedfrom)
val r1:Relation = new Relation
r1.setTarget(pub.getId)
r1.setSource(DoiBoostMappingUtil.generateGridAffiliationId(affiliation.GridId))
r1.setRelType("resultOrganization")
r1.setRelClass("isAuthorInstitutionOf")
r1.setSubRelType("affiliation")
r1.setDataInfo(pub.getDataInfo)
r1.setCollectedfrom(pub.getCollectedfrom)
List(r, r1)
})(mapEncoderRel).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation")
}
}