2020-05-22 15:15:09 +02:00
|
|
|
package eu.dnetlib.doiboost
|
|
|
|
|
|
|
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
2020-06-04 14:39:20 +02:00
|
|
|
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset, Organization}
|
2020-05-26 09:15:33 +02:00
|
|
|
import eu.dnetlib.doiboost.mag.ConversionUtil
|
2020-05-22 15:15:09 +02:00
|
|
|
import org.apache.commons.io.IOUtils
|
|
|
|
import org.apache.spark.SparkConf
|
2020-05-26 09:15:33 +02:00
|
|
|
import org.apache.spark.sql.functions.col
|
2020-05-22 15:15:09 +02:00
|
|
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
|
|
|
import org.slf4j.{Logger, LoggerFactory}
|
2020-05-26 09:15:33 +02:00
|
|
|
import scala.collection.JavaConverters._
|
2020-05-22 15:15:09 +02:00
|
|
|
|
|
|
|
object SparkGenerateDoiBoost {
|
|
|
|
|
|
|
|
def main(args: Array[String]): Unit = {
|
|
|
|
|
|
|
|
val logger: Logger = LoggerFactory.getLogger(getClass)
|
|
|
|
val conf: SparkConf = new SparkConf()
|
|
|
|
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/generate_doiboost_params.json")))
|
|
|
|
parser.parseArgument(args)
|
|
|
|
val spark: SparkSession =
|
|
|
|
SparkSession
|
|
|
|
.builder()
|
|
|
|
.config(conf)
|
|
|
|
.appName(getClass.getSimpleName)
|
|
|
|
.master(parser.get("master")).getOrCreate()
|
|
|
|
|
2020-05-26 09:15:33 +02:00
|
|
|
import spark.implicits._
|
2020-05-22 15:15:09 +02:00
|
|
|
|
2020-06-04 14:39:20 +02:00
|
|
|
val hostedByMapPath = parser.get("hostedByMapPath")
|
|
|
|
val workingDirPath = parser.get("workingDirPath")
|
2020-05-22 15:15:09 +02:00
|
|
|
|
|
|
|
|
|
|
|
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
|
2020-06-04 14:39:20 +02:00
|
|
|
implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization]
|
2020-05-22 15:15:09 +02:00
|
|
|
implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
|
|
|
|
implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPub)
|
2020-05-26 09:15:33 +02:00
|
|
|
implicit val mapEncoderRel: Encoder[Relation] = Encoders.kryo[Relation]
|
2020-05-22 15:15:09 +02:00
|
|
|
|
|
|
|
logger.info("Phase 2) Join Crossref with UnpayWall")
|
|
|
|
|
2020-05-22 20:51:42 +02:00
|
|
|
val crossrefPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/crossrefPublication").as[Publication].map(p => (p.getId, p))
|
|
|
|
val uwPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/uwPublication").as[Publication].map(p => (p.getId, p))
|
2020-05-22 15:15:09 +02:00
|
|
|
|
2020-05-22 20:51:42 +02:00
|
|
|
def applyMerge(item:((String, Publication), (String, Publication))) : Publication =
|
|
|
|
{
|
2020-05-22 15:15:09 +02:00
|
|
|
val crossrefPub = item._1._2
|
2020-05-22 20:51:42 +02:00
|
|
|
if (item._2!= null) {
|
|
|
|
val otherPub = item._2._2
|
|
|
|
if (otherPub != null) {
|
|
|
|
crossrefPub.mergeFrom(otherPub)
|
|
|
|
}
|
2020-05-22 15:15:09 +02:00
|
|
|
}
|
|
|
|
crossrefPub
|
2020-05-22 20:51:42 +02:00
|
|
|
}
|
|
|
|
crossrefPublication.joinWith(uwPublication, crossrefPublication("_1").equalTo(uwPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/firstJoin")
|
|
|
|
logger.info("Phase 3) Join Result with ORCID")
|
|
|
|
val fj: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/firstJoin").as[Publication].map(p => (p.getId, p))
|
|
|
|
val orcidPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/orcidPublication").as[Publication].map(p => (p.getId, p))
|
|
|
|
fj.joinWith(orcidPublication, fj("_1").equalTo(orcidPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/secondJoin")
|
|
|
|
|
|
|
|
logger.info("Phase 3) Join Result with MAG")
|
|
|
|
val sj: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/secondJoin").as[Publication].map(p => (p.getId, p))
|
|
|
|
|
|
|
|
val magPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/magPublication").as[Publication].map(p => (p.getId, p))
|
|
|
|
sj.joinWith(magPublication, sj("_1").equalTo(magPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublication")
|
|
|
|
|
|
|
|
|
2020-06-04 14:39:20 +02:00
|
|
|
val doiBoostPublication: Dataset[(String,Publication)] = spark.read.load(s"$workingDirPath/doiBoostPublication").as[Publication].filter(p=>DoiBoostMappingUtil.filterPublication(p)).map(DoiBoostMappingUtil.toISSNPair)(tupleForJoinEncoder)
|
2020-05-22 20:51:42 +02:00
|
|
|
|
2020-06-04 14:39:20 +02:00
|
|
|
val hostedByDataset : Dataset[(String, HostedByItemType)] = spark.createDataset(spark.sparkContext.textFile(hostedByMapPath).map(DoiBoostMappingUtil.toHostedByItem))
|
2020-05-26 09:15:33 +02:00
|
|
|
|
|
|
|
|
2020-06-04 14:39:20 +02:00
|
|
|
doiBoostPublication.joinWith(hostedByDataset, doiBoostPublication("_1").equalTo(hostedByDataset("_1")), "left")
|
|
|
|
.map(DoiBoostMappingUtil.fixPublication)
|
|
|
|
.write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationFiltered")
|
2020-05-26 09:15:33 +02:00
|
|
|
|
|
|
|
val affiliationPath = parser.get("affiliationPath")
|
|
|
|
val paperAffiliationPath = parser.get("paperAffiliationPath")
|
|
|
|
|
2020-06-04 14:39:20 +02:00
|
|
|
val affiliation = spark.read.load(affiliationPath).select(col("AffiliationId"), col("GridId"), col("OfficialPage"), col("DisplayName"))
|
2020-05-26 09:15:33 +02:00
|
|
|
|
|
|
|
val paperAffiliation = spark.read.load(paperAffiliationPath).select(col("AffiliationId").alias("affId"), col("PaperId"))
|
|
|
|
|
|
|
|
|
|
|
|
val a:Dataset[DoiBoostAffiliation] = paperAffiliation
|
2020-06-04 14:39:20 +02:00
|
|
|
.joinWith(affiliation, paperAffiliation("affId").equalTo(affiliation("AffiliationId")))
|
|
|
|
.select(col("_1.PaperId"), col("_2.AffiliationId"), col("_2.GridId"), col("_2.OfficialPage"), col("_2.DisplayName")).as[DoiBoostAffiliation]
|
2020-05-26 09:15:33 +02:00
|
|
|
|
2020-05-22 20:51:42 +02:00
|
|
|
|
|
|
|
|
2020-05-26 09:15:33 +02:00
|
|
|
val magPubs:Dataset[(String,Publication)]= spark.read.load(s"$workingDirPath/doiBoostPublicationFiltered").as[Publication]
|
|
|
|
.map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p))(tupleForJoinEncoder).filter(s =>s._1!= null )
|
2020-05-22 15:15:09 +02:00
|
|
|
|
|
|
|
|
2020-05-26 09:15:33 +02:00
|
|
|
magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).flatMap(item => {
|
|
|
|
val pub:Publication = item._1._2
|
|
|
|
val affiliation = item._2
|
2020-06-04 14:39:20 +02:00
|
|
|
val affId:String = if (affiliation.GridId.isDefined) DoiBoostMappingUtil.generateGridAffiliationId(affiliation.GridId.get) else DoiBoostMappingUtil.generateMAGAffiliationId(affiliation.AffiliationId.toString)
|
2020-05-26 09:15:33 +02:00
|
|
|
val r:Relation = new Relation
|
|
|
|
r.setSource(pub.getId)
|
2020-06-04 14:39:20 +02:00
|
|
|
r.setTarget(affId)
|
2020-05-26 09:15:33 +02:00
|
|
|
r.setRelType("resultOrganization")
|
|
|
|
r.setRelClass("hasAuthorInstitution")
|
|
|
|
r.setSubRelType("affiliation")
|
|
|
|
r.setDataInfo(pub.getDataInfo)
|
2020-06-04 14:39:20 +02:00
|
|
|
r.setCollectedfrom(List(DoiBoostMappingUtil.createMAGCollectedFrom()).asJava)
|
2020-05-26 09:15:33 +02:00
|
|
|
val r1:Relation = new Relation
|
|
|
|
r1.setTarget(pub.getId)
|
2020-06-04 14:39:20 +02:00
|
|
|
r1.setSource(affId)
|
2020-05-26 09:15:33 +02:00
|
|
|
r1.setRelType("resultOrganization")
|
|
|
|
r1.setRelClass("isAuthorInstitutionOf")
|
|
|
|
r1.setSubRelType("affiliation")
|
|
|
|
r1.setDataInfo(pub.getDataInfo)
|
2020-06-04 14:39:20 +02:00
|
|
|
r1.setCollectedfrom(List(DoiBoostMappingUtil.createMAGCollectedFrom()).asJava)
|
2020-05-26 09:15:33 +02:00
|
|
|
List(r, r1)
|
|
|
|
})(mapEncoderRel).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation")
|
2020-06-04 14:39:20 +02:00
|
|
|
|
|
|
|
|
|
|
|
magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).map( item => {
|
|
|
|
val affiliation = item._2
|
|
|
|
if (affiliation.GridId.isEmpty) {
|
|
|
|
val o = new Organization
|
|
|
|
o.setCollectedfrom(List(DoiBoostMappingUtil.createMAGCollectedFrom()).asJava)
|
|
|
|
o.setDataInfo(DoiBoostMappingUtil.generateDataInfo())
|
|
|
|
o.setId(DoiBoostMappingUtil.generateMAGAffiliationId(affiliation.AffiliationId.toString))
|
|
|
|
o.setOriginalId(List(affiliation.AffiliationId.toString).asJava)
|
|
|
|
if (affiliation.DisplayName.nonEmpty)
|
|
|
|
o.setLegalname(DoiBoostMappingUtil.asField(affiliation.DisplayName.get))
|
|
|
|
if (affiliation.OfficialPage.isDefined)
|
|
|
|
o.setWebsiteurl(DoiBoostMappingUtil.asField(affiliation.OfficialPage.get))
|
|
|
|
o.setCountry(DoiBoostMappingUtil.getUnknownCountry())
|
|
|
|
o
|
|
|
|
}
|
|
|
|
else
|
|
|
|
null
|
|
|
|
}).filter(o=> o!=null).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostOrganization")
|
|
|
|
}
|
2020-05-22 15:15:09 +02:00
|
|
|
|
|
|
|
}
|