next step of MAG conversion implemented

This commit is contained in:
Sandro La Bruzzo 2020-05-19 09:24:45 +02:00
parent a92ee0f41e
commit 486e850bcc
8 changed files with 436 additions and 107 deletions

View File

@ -0,0 +1,104 @@
package eu.dnetlib.doiboost
import eu.dnetlib.dhp.schema.oaf.{DataInfo, Dataset, Field, KeyValue, Qualifier, Result, StructuredProperty}
import eu.dnetlib.dhp.utils.DHPUtils
object DoiBoostMappingUtil {
//STATIC STRING
val MAG = "microsoft"
val ORCID = "ORCID"
val CROSSREF = "Crossref"
val UNPAYWALL = "UnpayWall"
val GRID_AC = "grid.ac"
val WIKPEDIA = "wikpedia"
val doiBoostNSPREFIX = "doiboost____"
val OPENAIRE_PREFIX = "openaire____"
val SEPARATOR = "::"
val DNET_LANGUAGES = "dnet:languages"
val PID_TYPES = "dnet:pid_types"
def generateDataInfo(): DataInfo = {
val di = new DataInfo
di.setDeletedbyinference(false)
di.setInferred(false)
di.setInvisible(false)
di.setTrust("0.9")
di.setProvenanceaction(createQualifier("sysimport:actionset", "dnet:provenanceActions"))
di
}
def createSP(value: String, classId: String, schemeId: String): StructuredProperty = {
val sp = new StructuredProperty
sp.setQualifier(createQualifier(classId, schemeId))
sp.setValue(value)
sp
}
def createSP(value: String, classId: String, schemeId: String, dataInfo: DataInfo): StructuredProperty = {
val sp = new StructuredProperty
sp.setQualifier(createQualifier(classId, schemeId))
sp.setValue(value)
sp.setDataInfo(dataInfo)
sp
}
def createCrossrefCollectedFrom(): KeyValue = {
val cf = new KeyValue
cf.setValue(CROSSREF)
cf.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5("crossref"))
cf
}
def generateIdentifier(oaf: Result, doi: String): String = {
val id = DHPUtils.md5(doi.toLowerCase)
if (oaf.isInstanceOf[Dataset])
return s"60|${doiBoostNSPREFIX}${SEPARATOR}${id}"
s"50|${doiBoostNSPREFIX}${SEPARATOR}${id}"
}
def createMAGCollectedFrom(): KeyValue = {
val cf = new KeyValue
cf.setValue(MAG)
cf.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5(MAG))
cf
}
def createQualifier(clsName: String, clsValue: String, schName: String, schValue: String): Qualifier = {
val q = new Qualifier
q.setClassid(clsName)
q.setClassname(clsValue)
q.setSchemeid(schName)
q.setSchemename(schValue)
q
}
def createQualifier(cls: String, sch: String): Qualifier = {
createQualifier(cls, cls, sch, sch)
}
def asField[T](value: T): Field[T] = {
val tmp = new Field[T]
tmp.setValue(value)
tmp
}
}

View File

@ -14,6 +14,7 @@ import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable import scala.collection.mutable
import scala.util.matching.Regex import scala.util.matching.Regex
import eu.dnetlib.doiboost.DoiBoostMappingUtil._
case class mappingAffiliation(name: String) {} case class mappingAffiliation(name: String) {}
@ -25,18 +26,7 @@ case class mappingFunder(name: String, DOI: Option[String], award: Option[List[S
case object Crossref2Oaf { case object Crossref2Oaf {
val logger: Logger = LoggerFactory.getLogger(Crossref2Oaf.getClass) val logger: Logger = LoggerFactory.getLogger(Crossref2Oaf.getClass)
//STATIC STRING
val MAG = "MAG"
val ORCID = "ORCID"
val CROSSREF = "Crossref"
val UNPAYWALL = "UnpayWall"
val GRID_AC = "grid.ac"
val WIKPEDIA = "wikpedia"
val doiBoostNSPREFIX = "doiboost____"
val OPENAIRE_PREFIX = "openaire____"
val SEPARATOR = "::"
val DNET_LANGUAGES = "dnet:languages"
val PID_TYPES = "dnet:pid_types"
val mappingCrossrefType = Map( val mappingCrossrefType = Map(
"book-section" -> "publication", "book-section" -> "publication",
@ -116,7 +106,7 @@ case object Crossref2Oaf {
result.setLastupdatetimestamp((json \ "indexed" \ "timestamp").extract[Long]) result.setLastupdatetimestamp((json \ "indexed" \ "timestamp").extract[Long])
result.setDateofcollection((json \ "indexed" \ "date-time").extract[String]) result.setDateofcollection((json \ "indexed" \ "date-time").extract[String])
result.setCollectedfrom(List(createCollectedFrom()).asJava) result.setCollectedfrom(List(createCrossrefCollectedFrom()).asJava)
// Publisher ( Name of work's publisher mapped into Result/Publisher) // Publisher ( Name of work's publisher mapped into Result/Publisher)
val publisher = (json \ "publisher").extractOrElse[String](null) val publisher = (json \ "publisher").extractOrElse[String](null)
@ -168,7 +158,7 @@ case object Crossref2Oaf {
result.setInstance(List(instance).asJava) result.setInstance(List(instance).asJava)
instance.setInstancetype(createQualifier(cobjCategory.substring(0, 4), cobjCategory.substring(5), "dnet:publication_resource", "dnet:publication_resource")) instance.setInstancetype(createQualifier(cobjCategory.substring(0, 4), cobjCategory.substring(5), "dnet:publication_resource", "dnet:publication_resource"))
instance.setCollectedfrom(createCollectedFrom()) instance.setCollectedfrom(createCrossrefCollectedFrom())
if (StringUtils.isNotBlank(issuedDate)) { if (StringUtils.isNotBlank(issuedDate)) {
instance.setDateofacceptance(asField(issuedDate)) instance.setDateofacceptance(asField(issuedDate))
} }
@ -215,7 +205,7 @@ case object Crossref2Oaf {
val funderList: List[mappingFunder] = (json \ "funder").extractOrElse[List[mappingFunder]](List()) val funderList: List[mappingFunder] = (json \ "funder").extractOrElse[List[mappingFunder]](List())
if (funderList.nonEmpty) { if (funderList.nonEmpty) {
resultList = resultList ::: mappingFunderToRelations(funderList, result.getId, createCollectedFrom(), result.getDataInfo, result.getLastupdatetimestamp) resultList = resultList ::: mappingFunderToRelations(funderList, result.getId, createCrossrefCollectedFrom(), result.getDataInfo, result.getLastupdatetimestamp)
} }
@ -416,71 +406,8 @@ case object Crossref2Oaf {
} }
def generateIdentifier(oaf: Result, doi: String): String = {
val id = DHPUtils.md5(doi.toLowerCase)
if (oaf.isInstanceOf[Dataset])
return s"60|${doiBoostNSPREFIX}${SEPARATOR}${id}"
s"50|${doiBoostNSPREFIX}${SEPARATOR}${id}"
}
def asField[T](value: T): Field[T] = {
val tmp = new Field[T]
tmp.setValue(value)
tmp
}
def generateDataInfo(): DataInfo = {
val di = new DataInfo
di.setDeletedbyinference(false)
di.setInferred(false)
di.setInvisible(false)
di.setTrust("0.9")
di.setProvenanceaction(createQualifier("sysimport:actionset", "dnet:provenanceActions"))
di
}
def createSP(value: String, classId: String, schemeId: String): StructuredProperty = {
val sp = new StructuredProperty
sp.setQualifier(createQualifier(classId, schemeId))
sp.setValue(value)
sp
}
def createSP(value: String, classId: String, schemeId: String, dataInfo: DataInfo): StructuredProperty = {
val sp = new StructuredProperty
sp.setQualifier(createQualifier(classId, schemeId))
sp.setValue(value)
sp.setDataInfo(dataInfo)
sp
}
def createCollectedFrom(): KeyValue = {
val cf = new KeyValue
cf.setValue(CROSSREF)
cf.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5("crossref"))
cf
}
def createQualifier(clsName: String, clsValue: String, schName: String, schValue: String): Qualifier = {
val q = new Qualifier
q.setClassid(clsName)
q.setClassname(clsValue)
q.setSchemeid(schName)
q.setSchemename(schValue)
q
}
def createQualifier(cls: String, sch: String): Qualifier = {
createQualifier(cls, cls, sch, sch)
}
def generateItemFromType(objectType: String, objectSubType: String): Result = { def generateItemFromType(objectType: String, objectSubType: String): Result = {

View File

@ -1,12 +1,18 @@
package eu.dnetlib.doiboost.mag package eu.dnetlib.doiboost.mag
import eu.dnetlib.dhp.schema.oaf.{Instance, Journal, Publication}
import org.json4s import org.json4s
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse import org.json4s.jackson.JsonMethods.parse
import eu.dnetlib.doiboost.DoiBoostMappingUtil._
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.matching.Regex
case class Papers(PaperId:Long, Rank:Integer, Doi:String, case class MagPapers(PaperId: Long, Rank: Integer, Doi: String,
DocType: String, PaperTitle: String, OriginalTitle: String, DocType: String, PaperTitle: String, OriginalTitle: String,
BookTitle: String, Year: Option[Integer], Date: Option[java.sql.Timestamp], Publisher: String, BookTitle: String, Year: Option[Integer], Date: Option[java.sql.Timestamp], Publisher: String,
JournalId: Option[Long], ConferenceSeriesId: Option[Long], ConferenceInstanceId: Option[Long], JournalId: Option[Long], ConferenceSeriesId: Option[Long], ConferenceInstanceId: Option[Long],
@ -15,28 +21,182 @@ case class Papers(PaperId:Long, Rank:Integer, Doi:String,
OriginalVenue: String, FamilyId: Option[Long], CreatedDate: java.sql.Timestamp) {} OriginalVenue: String, FamilyId: Option[Long], CreatedDate: java.sql.Timestamp) {}
case class PaperAbstract(PaperId:Long,IndexedAbstract:String) {} case class MagPaperAbstract(PaperId: Long, IndexedAbstract: String) {}
case class MagAuthor(AuthorId: Long, Rank: Option[Int], NormalizedName: Option[String], DisplayName: Option[String], LastKnownAffiliationId: Option[Long], PaperCount: Option[Long], CitationCount: Option[Long], CreatedDate: Option[java.sql.Timestamp]) {}
case class MagAffiliation(AffiliationId: Long, Rank: Int, NormalizedName: String, DisplayName: String, GridId: String, OfficialPage: String, WikiPage: String, PaperCount: Long, CitationCount: Long, Latitude: Option[Float], Longitude: Option[Float], CreatedDate: java.sql.Timestamp) {}
case class MagPaperAuthorAffiliation(PaperId: Long, AuthorId: Long, AffiliationId: Option[Long], AuthorSequenceNumber: Int, OriginalAuthor: String, OriginalAffiliation: String) {}
case class MagAuthorAffiliation(author: MagAuthor, affiliation:String)
case class MagPaperWithAuthorList(PaperId: Long, authors: List[MagAuthorAffiliation]) {}
case class MagPaperAuthorDenormalized(PaperId: Long, author: MagAuthor, affiliation:String) {}
case class MagPaperUrl(PaperId: Long, SourceType: Option[Int], SourceUrl: Option[String], LanguageCode: Option[String]) {}
case class MagUrl(PaperId: Long, instances: List[String])
case class MagJournal(JournalId: Long, Rank: Option[Int], NormalizedName: Option[String], DisplayName: Option[String], Issn: Option[String], Publisher: Option[String], Webpage: Option[String], PaperCount: Option[Long], CitationCount: Option[Long], CreatedDate: Option[java.sql.Timestamp]) {}
case object ConversionUtil { case object ConversionUtil {
def extractMagIdentifier(pids:mutable.Buffer[String]) :String ={
val magIDRegex: Regex = "^[0-9]+$".r
val s =pids.filter(p=> magIDRegex.findAllIn(p).hasNext)
if (s.nonEmpty)
def transformPaperAbstract(input:PaperAbstract) : PaperAbstract = { return s.head
PaperAbstract(input.PaperId, convertInvertedIndexString(input.IndexedAbstract)) null
} }
def addInstances(a: (Publication, MagUrl)): Publication = {
val pub = a._1
val urls = a._2
val i = new Instance
if (urls!= null) {
val l:List[String] = urls.instances.filter(k=>k.nonEmpty):::List(s"https://academic.microsoft.com/#/detail/${extractMagIdentifier(pub.getOriginalId.asScala)}")
i.setUrl(l.asJava)
}
else
i.setUrl(List(s"https://academic.microsoft.com/#/detail/${extractMagIdentifier(pub.getOriginalId.asScala)}").asJava)
i.setCollectedfrom(createMAGCollectedFrom())
pub.setInstance(List(i).asJava)
pub
}
def transformPaperAbstract(input: MagPaperAbstract): MagPaperAbstract = {
MagPaperAbstract(input.PaperId, convertInvertedIndexString(input.IndexedAbstract))
}
def createOAFFromJournalAuthorPaper(inputParams: ((MagPapers, MagJournal), MagPaperWithAuthorList)): Publication = {
val paper = inputParams._1._1
val journal = inputParams._1._2
val authors = inputParams._2
val pub = new Publication
pub.setPid(List(createSP(paper.Doi.toLowerCase, "doi", PID_TYPES)).asJava)
pub.setOriginalId(List(paper.PaperId.toString, paper.Doi.toLowerCase).asJava)
//Set identifier as {50|60} | doiboost____::md5(DOI)
pub.setId(generateIdentifier(pub, paper.Doi.toLowerCase))
val mainTitles = createSP(paper.PaperTitle, "main title", "dnet:dataCite_title")
val originalTitles = createSP(paper.OriginalTitle, "alternative title", "dnet:dataCite_title")
pub.setTitle(List(mainTitles, originalTitles).asJava)
pub.setSource(List(asField(paper.BookTitle)).asJava)
val authorsOAF = authors.authors.map { f: MagAuthorAffiliation =>
val a: eu.dnetlib.dhp.schema.oaf.Author = new eu.dnetlib.dhp.schema.oaf.Author
a.setFullname(f.author.DisplayName.get)
if(f.affiliation!= null)
a.setAffiliation(List(asField(f.affiliation)).asJava)
a.setPid(List(createSP(s"https://academic.microsoft.com/#/detail/${f.author.AuthorId}", "URL", PID_TYPES)).asJava)
a
}
pub.setAuthor(authorsOAF.asJava)
if (paper.Date != null && paper.Date.isDefined) {
pub.setDateofacceptance(asField(paper.Date.get.toString))
}
pub.setPublisher(asField(paper.Publisher))
if (journal != null && journal.DisplayName.isDefined) {
val j = new Journal
j.setName(journal.DisplayName.get)
j.setSp(paper.FirstPage)
j.setEp(paper.LastPage)
if (journal.Publisher.isDefined)
j.setEdition(journal.Publisher.get)
if (journal.Issn.isDefined)
j.setIssnPrinted(journal.Issn.get)
pub.setJournal(j)
}
pub
}
def createOAF(inputParams: ((MagPapers, MagPaperWithAuthorList), MagPaperAbstract)): Publication = {
val paper = inputParams._1._1
val authors = inputParams._1._2
val description = inputParams._2
val pub = new Publication
pub.setPid(List(createSP(paper.Doi.toLowerCase, "doi", PID_TYPES)).asJava)
pub.setOriginalId(List(paper.PaperId.toString, paper.Doi.toLowerCase).asJava)
//Set identifier as {50|60} | doiboost____::md5(DOI)
pub.setId(generateIdentifier(pub, paper.Doi.toLowerCase))
val mainTitles = createSP(paper.PaperTitle, "main title", "dnet:dataCite_title")
val originalTitles = createSP(paper.OriginalTitle, "alternative title", "dnet:dataCite_title")
pub.setTitle(List(mainTitles, originalTitles).asJava)
pub.setSource(List(asField(paper.BookTitle)).asJava)
if (description != null) {
pub.setDescription(List(asField(description.IndexedAbstract)).asJava)
}
val authorsOAF = authors.authors.map { f: MagAuthorAffiliation =>
val a: eu.dnetlib.dhp.schema.oaf.Author = new eu.dnetlib.dhp.schema.oaf.Author
a.setFullname(f.author.DisplayName.get)
if(f.affiliation!= null)
a.setAffiliation(List(asField(f.affiliation)).asJava)
a.setPid(List(createSP(s"https://academic.microsoft.com/#/detail/${f.author.AuthorId}", "URL", PID_TYPES)).asJava)
a
}
if (paper.Date != null) {
pub.setDateofacceptance(asField(paper.Date.toString))
}
pub.setAuthor(authorsOAF.asJava)
pub
}
def convertInvertedIndexString(json_input: String): String = { def convertInvertedIndexString(json_input: String): String = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(json_input) lazy val json: json4s.JValue = parse(json_input)
val idl = (json \ "IndexLength").extract[Int] val idl = (json \ "IndexLength").extract[Int]
if (idl > 0) { if (idl > 0) {
val res = Array.ofDim[String](idl) val res = Array.ofDim[String](idl)
@ -45,8 +205,11 @@ case object ConversionUtil {
for {(k: String, v: List[Int]) <- iid} { for {(k: String, v: List[Int]) <- iid} {
v.foreach(item => res(item) = k) v.foreach(item => res(item) = k)
} }
(0 until idl).foreach(i => {
if (res(i) == null)
res(i) = ""
})
return res.mkString(" ") return res.mkString(" ")
} }
"" ""
} }

View File

@ -1,13 +1,17 @@
package eu.dnetlib.doiboost.mag package eu.dnetlib.doiboost.mag
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Publication
import eu.dnetlib.doiboost.DoiBoostMappingUtil.asField
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession} import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._
import scala.collection.JavaConverters._
object SparkPreProcessMAG { object SparkPreProcessMAG {
@ -23,15 +27,21 @@ object SparkPreProcessMAG {
.config(conf) .config(conf)
.appName(getClass.getSimpleName) .appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate() .master(parser.get("master")).getOrCreate()
val sourcePath = parser.get("sourcePath")
import spark.implicits._ import spark.implicits._
implicit val mapEncoderPubs: Encoder[Publication] = org.apache.spark.sql.Encoders.kryo[Publication]
implicit val tupleForJoinEncoder = Encoders.tuple(Encoders.STRING, mapEncoderPubs)
logger.info("Phase 1) make uninque DOI in Papers:") logger.info("Phase 1) make uninque DOI in Papers:")
val d: Dataset[Papers] = spark.read.load(s"${parser.get("sourcePath")}/Papers").as[Papers] val d: Dataset[MagPapers] = spark.read.load(s"${parser.get("sourcePath")}/Papers").as[MagPapers]
// Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one // Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one
val result: RDD[Papers] = d.where(col("Doi").isNotNull).rdd.map { p: Papers => Tuple2(p.Doi, p) }.reduceByKey { case (p1: Papers, p2: Papers) => val result: RDD[MagPapers] = d.where(col("Doi").isNotNull).rdd.map { p: MagPapers => Tuple2(p.Doi, p) }.reduceByKey { case (p1: MagPapers, p2: MagPapers) =>
var r = if (p1 == null) p2 else p1 var r = if (p1 == null) p2 else p1
if (p1 != null && p2 != null) { if (p1 != null && p2 != null) {
if (p1.CreatedDate != null && p2.CreatedDate != null) { if (p1.CreatedDate != null && p2.CreatedDate != null) {
@ -46,16 +56,83 @@ object SparkPreProcessMAG {
r r
}.map(_._2) }.map(_._2)
val distinctPaper: Dataset[Papers] = spark.createDataset(result) val distinctPaper: Dataset[MagPapers] = spark.createDataset(result)
distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct") distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct")
logger.info(s"Total number of element: ${result.count()}") logger.info(s"Total number of element: ${result.count()}")
logger.info("Phase 2) convert InverdIndex Abastrac to string") logger.info("Phase 3) Group Author by PaperId")
val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[PaperAbstract] val authors = spark.read.load(s"$sourcePath/Authors").as[MagAuthor]
val affiliation =spark.read.load(s"$sourcePath/Affiliations").as[MagAffiliation]
val paperAuthorAffiliation =spark.read.load(s"$sourcePath/PaperAuthorAffiliations").as[MagPaperAuthorAffiliation]
paperAuthorAffiliation.joinWith(authors, paperAuthorAffiliation("AuthorId").equalTo(authors("AuthorId")))
.map{case (a:MagPaperAuthorAffiliation,b:MagAuthor )=> (a.AffiliationId,MagPaperAuthorDenormalized(a.PaperId, b, null)) }
.joinWith(affiliation, affiliation("AffiliationId").equalTo(col("_1")), "left")
.map(s => {
val mpa = s._1._2
val af = s._2
if (af!= null) {
MagPaperAuthorDenormalized(mpa.PaperId, mpa.author, af.DisplayName)
} else
mpa
}).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation")).as("authors"))
.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_1_paper_authors")
logger.info("Phase 4) create First Version of publication Entity with Paper Journal and Authors")
val journals = spark.read.load(s"$sourcePath/Journals").as[MagJournal]
val papers =spark.read.load((s"${parser.get("targetPath")}/Papers_distinct")).as[MagPapers]
val paperWithAuthors = spark.read.load(s"${parser.get("targetPath")}/merge_step_1_paper_authors").as[MagPaperWithAuthorList]
val firstJoin =papers.joinWith(journals, papers("JournalId").equalTo(journals("JournalId")),"left")
firstJoin.joinWith(paperWithAuthors, firstJoin("_1.PaperId").equalTo(paperWithAuthors("PaperId")), "left")
.map { a: ((MagPapers, MagJournal), MagPaperWithAuthorList) => ConversionUtil.createOAFFromJournalAuthorPaper(a) }.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_2")
var magPubs:Dataset[(String,Publication)] = spark.read.load(s"${parser.get("targetPath")}/merge_step_2").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String,Publication)]
val paperUrlDataset = spark.read.load(s"$sourcePath/PaperUrls").as[MagPaperUrl].groupBy("PaperId").agg(collect_list(struct("sourceUrl")).as("instances")).as[MagUrl]
logger.info("Phase 5) enrich publication with URL and Instances")
magPubs.joinWith(paperUrlDataset, col("_1").equalTo(paperUrlDataset("PaperId")), "left")
.map{a:((String,Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2))}
.write.mode(SaveMode.Overwrite)
.save(s"${parser.get("targetPath")}/merge_step_3")
logger.info("Phase 6) Enrich Publication with description")
val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract]
pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract") pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract")
val paperAbstract =spark.read.load((s"${parser.get("targetPath")}/PaperAbstract")).as[MagPaperAbstract]
distinctPaper.joinWith(pa, col("PaperId").eqia)
magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_3").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String,Publication)]
magPubs.joinWith(paperAbstract,col("_1").equalTo(paperAbstract("PaperId")), "left").map(p=>
{
val pub = p._1._2
val abst = p._2
if (abst!= null) {
pub.setDescription(List(asField(abst.IndexedAbstract)).asJava)
}
pub
}
).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_4")
} }

View File

@ -72,6 +72,7 @@
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT} ${sparkExtraOPT}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg> <arg>--sourcePath</arg><arg>${sourcePath}</arg>

View File

@ -11,6 +11,10 @@
<name>queueName</name> <name>queueName</name>
<value>default</value> <value>default</value>
</property> </property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property> <property>
<name>oozie.action.sharelib.for.spark</name> <name>oozie.action.sharelib.for.spark</name>
<value>spark2</value> <value>spark2</value>

View File

@ -16,6 +16,10 @@
<name>sparkExecutorCores</name> <name>sparkExecutorCores</name>
<description>number of cores used by single executor</description> <description>number of cores used by single executor</description>
</property> </property>
<property>
<name>outputPath</name>
<description>the working dir base path</description>
</property>
</parameters> </parameters>
<start to="ResetWorkingPath"/> <start to="ResetWorkingPath"/>
@ -47,6 +51,7 @@
</spark-opts> </spark-opts>
<arg>-mt</arg> <arg>yarn</arg> <arg>-mt</arg> <arg>yarn</arg>
<arg>--workingPath_orcid</arg><arg>${workingPath_activities}/</arg> <arg>--workingPath_orcid</arg><arg>${workingPath_activities}/</arg>
<arg>-o</arg><arg>${outputPath}/</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -1,10 +1,21 @@
package eu.dnetlib.doiboost.mag package eu.dnetlib.doiboost.mag
import org.codehaus.jackson.map.ObjectMapper import eu.dnetlib.dhp.schema.oaf.Publication
import org.apache.htrace.fasterxml.jackson.databind.SerializationFeature
import org.apache.spark.SparkConf
import org.apache.spark.api.java.function.MapFunction
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.apache.spark.sql.functions._
import scala.collection.JavaConverters._
import scala.io.Source import scala.io.Source
import scala.reflect.ClassTag
import scala.util.matching.Regex
class MAGMappingTest { class MAGMappingTest {
@ -13,9 +24,44 @@ class MAGMappingTest {
val mapper = new ObjectMapper() val mapper = new ObjectMapper()
//@Test @Test
def testMAGCSV(): Unit = { def testMAGCSV(): Unit = {
SparkPreProcessMAG.main("-m local[*] -s /data/doiboost/mag/datasets -t /data/doiboost/mag/datasets/preprocess".split(" ")) // SparkPreProcessMAG.main("-m local[*] -s /data/doiboost/mag/datasets -t /data/doiboost/mag/datasets/preprocess".split(" "))
val sparkConf: SparkConf = new SparkConf
val spark: SparkSession = SparkSession.builder()
.config(sparkConf)
.appName(getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
import spark.implicits._
implicit val mapEncoderPubs: Encoder[Publication] = org.apache.spark.sql.Encoders.kryo[Publication]
implicit val longBarEncoder = Encoders.tuple(Encoders.STRING, mapEncoderPubs)
val sourcePath = "/data/doiboost/mag/input"
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
val magOAF = spark.read.load("$sourcePath/merge_step_4").as[Publication]
println(magOAF.first().getOriginalId)
magOAF.map(k => (ConversionUtil.extractMagIdentifier(k.getOriginalId.asScala),k)).as[(String,Publication)].show()
println((ConversionUtil.extractMagIdentifier(magOAF.first().getOriginalId.asScala)))
val magIDRegex: Regex = "^[0-9]+$".r
println(magIDRegex.findFirstMatchIn("suca").isDefined)
} }
@ -32,3 +78,5 @@ class MAGMappingTest {
} }