From 486e850bcc24c88ec3b36747ccae3955c01455b5 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 19 May 2020 09:24:45 +0200 Subject: [PATCH] next step of MAG conversion implemented --- .../doiboost/DoiBoostMappingUtil.scala | 104 +++++++++ .../doiboost/crossref/Crossref2Oaf.scala | 83 +------- .../dnetlib/doiboost/mag/MagDataModel.scala | 199 ++++++++++++++++-- .../doiboost/mag/SparkPreProcessMAG.scala | 91 +++++++- .../dhp/doiboost/mag/oozie_app/workflow.xml | 1 + .../oozie_app/config-default.xml | 4 + .../orcid_gen_authors/oozie_app/workflow.xml | 5 + .../dnetlib/doiboost/mag/MAGMappingTest.scala | 56 ++++- 8 files changed, 436 insertions(+), 107 deletions(-) create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala new file mode 100644 index 000000000..68a3231e0 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala @@ -0,0 +1,104 @@ +package eu.dnetlib.doiboost + +import eu.dnetlib.dhp.schema.oaf.{DataInfo, Dataset, Field, KeyValue, Qualifier, Result, StructuredProperty} +import eu.dnetlib.dhp.utils.DHPUtils + +object DoiBoostMappingUtil { + + //STATIC STRING + val MAG = "microsoft" + val ORCID = "ORCID" + val CROSSREF = "Crossref" + val UNPAYWALL = "UnpayWall" + val GRID_AC = "grid.ac" + val WIKPEDIA = "wikpedia" + val doiBoostNSPREFIX = "doiboost____" + val OPENAIRE_PREFIX = "openaire____" + val SEPARATOR = "::" + val DNET_LANGUAGES = "dnet:languages" + val PID_TYPES = "dnet:pid_types" + + + + def generateDataInfo(): DataInfo = { + val di = new DataInfo + di.setDeletedbyinference(false) + di.setInferred(false) + di.setInvisible(false) + di.setTrust("0.9") + di.setProvenanceaction(createQualifier("sysimport:actionset", "dnet:provenanceActions")) + di + } + + + def createSP(value: String, classId: String, schemeId: String): StructuredProperty = { + val sp = new StructuredProperty + sp.setQualifier(createQualifier(classId, schemeId)) + sp.setValue(value) + sp + + } + + def createSP(value: String, classId: String, schemeId: String, dataInfo: DataInfo): StructuredProperty = { + val sp = new StructuredProperty + sp.setQualifier(createQualifier(classId, schemeId)) + sp.setValue(value) + sp.setDataInfo(dataInfo) + sp + + } + + def createCrossrefCollectedFrom(): KeyValue = { + + val cf = new KeyValue + cf.setValue(CROSSREF) + cf.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5("crossref")) + cf + + } + + def generateIdentifier(oaf: Result, doi: String): String = { + val id = DHPUtils.md5(doi.toLowerCase) + if (oaf.isInstanceOf[Dataset]) + return s"60|${doiBoostNSPREFIX}${SEPARATOR}${id}" + s"50|${doiBoostNSPREFIX}${SEPARATOR}${id}" + } + + + + + + def createMAGCollectedFrom(): KeyValue = { + + val cf = new KeyValue + cf.setValue(MAG) + cf.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5(MAG)) + cf + + } + + def createQualifier(clsName: String, clsValue: String, schName: String, schValue: String): Qualifier = { + val q = new Qualifier + q.setClassid(clsName) + q.setClassname(clsValue) + q.setSchemeid(schName) + q.setSchemename(schValue) + q + } + + def createQualifier(cls: String, sch: String): Qualifier = { + createQualifier(cls, cls, sch, sch) + } + + + def asField[T](value: T): Field[T] = { + val tmp = new Field[T] + tmp.setValue(value) + tmp + + + } + + + +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala index 2d3b9a43a..eda3bf17a 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala @@ -14,6 +14,7 @@ import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.matching.Regex +import eu.dnetlib.doiboost.DoiBoostMappingUtil._ case class mappingAffiliation(name: String) {} @@ -25,18 +26,7 @@ case class mappingFunder(name: String, DOI: Option[String], award: Option[List[S case object Crossref2Oaf { val logger: Logger = LoggerFactory.getLogger(Crossref2Oaf.getClass) - //STATIC STRING - val MAG = "MAG" - val ORCID = "ORCID" - val CROSSREF = "Crossref" - val UNPAYWALL = "UnpayWall" - val GRID_AC = "grid.ac" - val WIKPEDIA = "wikpedia" - val doiBoostNSPREFIX = "doiboost____" - val OPENAIRE_PREFIX = "openaire____" - val SEPARATOR = "::" - val DNET_LANGUAGES = "dnet:languages" - val PID_TYPES = "dnet:pid_types" + val mappingCrossrefType = Map( "book-section" -> "publication", @@ -116,7 +106,7 @@ case object Crossref2Oaf { result.setLastupdatetimestamp((json \ "indexed" \ "timestamp").extract[Long]) result.setDateofcollection((json \ "indexed" \ "date-time").extract[String]) - result.setCollectedfrom(List(createCollectedFrom()).asJava) + result.setCollectedfrom(List(createCrossrefCollectedFrom()).asJava) // Publisher ( Name of work's publisher mapped into Result/Publisher) val publisher = (json \ "publisher").extractOrElse[String](null) @@ -168,7 +158,7 @@ case object Crossref2Oaf { result.setInstance(List(instance).asJava) instance.setInstancetype(createQualifier(cobjCategory.substring(0, 4), cobjCategory.substring(5), "dnet:publication_resource", "dnet:publication_resource")) - instance.setCollectedfrom(createCollectedFrom()) + instance.setCollectedfrom(createCrossrefCollectedFrom()) if (StringUtils.isNotBlank(issuedDate)) { instance.setDateofacceptance(asField(issuedDate)) } @@ -215,7 +205,7 @@ case object Crossref2Oaf { val funderList: List[mappingFunder] = (json \ "funder").extractOrElse[List[mappingFunder]](List()) if (funderList.nonEmpty) { - resultList = resultList ::: mappingFunderToRelations(funderList, result.getId, createCollectedFrom(), result.getDataInfo, result.getLastupdatetimestamp) + resultList = resultList ::: mappingFunderToRelations(funderList, result.getId, createCrossrefCollectedFrom(), result.getDataInfo, result.getLastupdatetimestamp) } @@ -416,71 +406,8 @@ case object Crossref2Oaf { } - def generateIdentifier(oaf: Result, doi: String): String = { - val id = DHPUtils.md5(doi.toLowerCase) - if (oaf.isInstanceOf[Dataset]) - return s"60|${doiBoostNSPREFIX}${SEPARATOR}${id}" - s"50|${doiBoostNSPREFIX}${SEPARATOR}${id}" - } - - def asField[T](value: T): Field[T] = { - val tmp = new Field[T] - tmp.setValue(value) - tmp - } - - - def generateDataInfo(): DataInfo = { - val di = new DataInfo - di.setDeletedbyinference(false) - di.setInferred(false) - di.setInvisible(false) - di.setTrust("0.9") - di.setProvenanceaction(createQualifier("sysimport:actionset", "dnet:provenanceActions")) - di - } - - - def createSP(value: String, classId: String, schemeId: String): StructuredProperty = { - val sp = new StructuredProperty - sp.setQualifier(createQualifier(classId, schemeId)) - sp.setValue(value) - sp - - } - - def createSP(value: String, classId: String, schemeId: String, dataInfo: DataInfo): StructuredProperty = { - val sp = new StructuredProperty - sp.setQualifier(createQualifier(classId, schemeId)) - sp.setValue(value) - sp.setDataInfo(dataInfo) - sp - - } - - def createCollectedFrom(): KeyValue = { - - val cf = new KeyValue - cf.setValue(CROSSREF) - cf.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5("crossref")) - cf - - } - - def createQualifier(clsName: String, clsValue: String, schName: String, schValue: String): Qualifier = { - val q = new Qualifier - q.setClassid(clsName) - q.setClassname(clsValue) - q.setSchemeid(schName) - q.setSchemename(schValue) - q - } - - def createQualifier(cls: String, sch: String): Qualifier = { - createQualifier(cls, cls, sch, sch) - } def generateItemFromType(objectType: String, objectSubType: String): Result = { diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala index 189e90ed9..17f0395ca 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala @@ -1,52 +1,215 @@ package eu.dnetlib.doiboost.mag +import eu.dnetlib.dhp.schema.oaf.{Instance, Journal, Publication} import org.json4s import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse +import eu.dnetlib.doiboost.DoiBoostMappingUtil._ + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.matching.Regex -case class Papers(PaperId:Long, Rank:Integer, Doi:String, - DocType:String, PaperTitle:String, OriginalTitle:String, - BookTitle:String, Year:Option[Integer], Date:Option[java.sql.Timestamp], Publisher:String, - JournalId:Option[Long], ConferenceSeriesId:Option[Long], ConferenceInstanceId:Option[Long], - Volume:String, Issue:String, FirstPage:String, LastPage:String, - ReferenceCount:Option[Long], CitationCount:Option[Long], EstimatedCitation:Option[Long], - OriginalVenue:String, FamilyId:Option[Long], CreatedDate:java.sql.Timestamp) {} +case class MagPapers(PaperId: Long, Rank: Integer, Doi: String, + DocType: String, PaperTitle: String, OriginalTitle: String, + BookTitle: String, Year: Option[Integer], Date: Option[java.sql.Timestamp], Publisher: String, + JournalId: Option[Long], ConferenceSeriesId: Option[Long], ConferenceInstanceId: Option[Long], + Volume: String, Issue: String, FirstPage: String, LastPage: String, + ReferenceCount: Option[Long], CitationCount: Option[Long], EstimatedCitation: Option[Long], + OriginalVenue: String, FamilyId: Option[Long], CreatedDate: java.sql.Timestamp) {} -case class PaperAbstract(PaperId:Long,IndexedAbstract:String) {} +case class MagPaperAbstract(PaperId: Long, IndexedAbstract: String) {} +case class MagAuthor(AuthorId: Long, Rank: Option[Int], NormalizedName: Option[String], DisplayName: Option[String], LastKnownAffiliationId: Option[Long], PaperCount: Option[Long], CitationCount: Option[Long], CreatedDate: Option[java.sql.Timestamp]) {} + +case class MagAffiliation(AffiliationId: Long, Rank: Int, NormalizedName: String, DisplayName: String, GridId: String, OfficialPage: String, WikiPage: String, PaperCount: Long, CitationCount: Long, Latitude: Option[Float], Longitude: Option[Float], CreatedDate: java.sql.Timestamp) {} + +case class MagPaperAuthorAffiliation(PaperId: Long, AuthorId: Long, AffiliationId: Option[Long], AuthorSequenceNumber: Int, OriginalAuthor: String, OriginalAffiliation: String) {} + + +case class MagAuthorAffiliation(author: MagAuthor, affiliation:String) + +case class MagPaperWithAuthorList(PaperId: Long, authors: List[MagAuthorAffiliation]) {} + +case class MagPaperAuthorDenormalized(PaperId: Long, author: MagAuthor, affiliation:String) {} + +case class MagPaperUrl(PaperId: Long, SourceType: Option[Int], SourceUrl: Option[String], LanguageCode: Option[String]) {} + +case class MagUrl(PaperId: Long, instances: List[String]) + + +case class MagJournal(JournalId: Long, Rank: Option[Int], NormalizedName: Option[String], DisplayName: Option[String], Issn: Option[String], Publisher: Option[String], Webpage: Option[String], PaperCount: Option[Long], CitationCount: Option[Long], CreatedDate: Option[java.sql.Timestamp]) {} case object ConversionUtil { + def extractMagIdentifier(pids:mutable.Buffer[String]) :String ={ + val magIDRegex: Regex = "^[0-9]+$".r + val s =pids.filter(p=> magIDRegex.findAllIn(p).hasNext) - - def transformPaperAbstract(input:PaperAbstract) : PaperAbstract = { - PaperAbstract(input.PaperId, convertInvertedIndexString(input.IndexedAbstract)) + if (s.nonEmpty) + return s.head + null } - def convertInvertedIndexString(json_input:String) :String = { + def addInstances(a: (Publication, MagUrl)): Publication = { + val pub = a._1 + val urls = a._2 + + + val i = new Instance + + + if (urls!= null) { + + val l:List[String] = urls.instances.filter(k=>k.nonEmpty):::List(s"https://academic.microsoft.com/#/detail/${extractMagIdentifier(pub.getOriginalId.asScala)}") + + i.setUrl(l.asJava) + } + else + i.setUrl(List(s"https://academic.microsoft.com/#/detail/${extractMagIdentifier(pub.getOriginalId.asScala)}").asJava) + + i.setCollectedfrom(createMAGCollectedFrom()) + pub.setInstance(List(i).asJava) + pub + } + + + def transformPaperAbstract(input: MagPaperAbstract): MagPaperAbstract = { + MagPaperAbstract(input.PaperId, convertInvertedIndexString(input.IndexedAbstract)) + } + + + def createOAFFromJournalAuthorPaper(inputParams: ((MagPapers, MagJournal), MagPaperWithAuthorList)): Publication = { + val paper = inputParams._1._1 + val journal = inputParams._1._2 + val authors = inputParams._2 + + val pub = new Publication + pub.setPid(List(createSP(paper.Doi.toLowerCase, "doi", PID_TYPES)).asJava) + pub.setOriginalId(List(paper.PaperId.toString, paper.Doi.toLowerCase).asJava) + + //Set identifier as {50|60} | doiboost____::md5(DOI) + pub.setId(generateIdentifier(pub, paper.Doi.toLowerCase)) + + val mainTitles = createSP(paper.PaperTitle, "main title", "dnet:dataCite_title") + val originalTitles = createSP(paper.OriginalTitle, "alternative title", "dnet:dataCite_title") + pub.setTitle(List(mainTitles, originalTitles).asJava) + + pub.setSource(List(asField(paper.BookTitle)).asJava) + + val authorsOAF = authors.authors.map { f: MagAuthorAffiliation => + + val a: eu.dnetlib.dhp.schema.oaf.Author = new eu.dnetlib.dhp.schema.oaf.Author + + a.setFullname(f.author.DisplayName.get) + + if(f.affiliation!= null) + a.setAffiliation(List(asField(f.affiliation)).asJava) + a.setPid(List(createSP(s"https://academic.microsoft.com/#/detail/${f.author.AuthorId}", "URL", PID_TYPES)).asJava) + a + } + pub.setAuthor(authorsOAF.asJava) + + + if (paper.Date != null && paper.Date.isDefined) { + pub.setDateofacceptance(asField(paper.Date.get.toString)) + } + pub.setPublisher(asField(paper.Publisher)) + + + if (journal != null && journal.DisplayName.isDefined) { + val j = new Journal + + j.setName(journal.DisplayName.get) + j.setSp(paper.FirstPage) + j.setEp(paper.LastPage) + if (journal.Publisher.isDefined) + j.setEdition(journal.Publisher.get) + if (journal.Issn.isDefined) + j.setIssnPrinted(journal.Issn.get) + pub.setJournal(j) + } + pub + } + + + def createOAF(inputParams: ((MagPapers, MagPaperWithAuthorList), MagPaperAbstract)): Publication = { + + val paper = inputParams._1._1 + val authors = inputParams._1._2 + val description = inputParams._2 + + val pub = new Publication + pub.setPid(List(createSP(paper.Doi.toLowerCase, "doi", PID_TYPES)).asJava) + pub.setOriginalId(List(paper.PaperId.toString, paper.Doi.toLowerCase).asJava) + + //Set identifier as {50|60} | doiboost____::md5(DOI) + pub.setId(generateIdentifier(pub, paper.Doi.toLowerCase)) + + val mainTitles = createSP(paper.PaperTitle, "main title", "dnet:dataCite_title") + val originalTitles = createSP(paper.OriginalTitle, "alternative title", "dnet:dataCite_title") + pub.setTitle(List(mainTitles, originalTitles).asJava) + + pub.setSource(List(asField(paper.BookTitle)).asJava) + + + if (description != null) { + pub.setDescription(List(asField(description.IndexedAbstract)).asJava) + } + + + val authorsOAF = authors.authors.map { f: MagAuthorAffiliation => + + val a: eu.dnetlib.dhp.schema.oaf.Author = new eu.dnetlib.dhp.schema.oaf.Author + + a.setFullname(f.author.DisplayName.get) + + if(f.affiliation!= null) + a.setAffiliation(List(asField(f.affiliation)).asJava) + + + a.setPid(List(createSP(s"https://academic.microsoft.com/#/detail/${f.author.AuthorId}", "URL", PID_TYPES)).asJava) + + a + + } + + + if (paper.Date != null) { + pub.setDateofacceptance(asField(paper.Date.toString)) + } + + pub.setAuthor(authorsOAF.asJava) + + + pub + + } + + + def convertInvertedIndexString(json_input: String): String = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: json4s.JValue = parse(json_input) - - - val idl = (json \ "IndexLength").extract[Int] - if (idl > 0) { val res = Array.ofDim[String](idl) val iid = (json \ "InvertedIndex").extract[Map[String, List[Int]]] - for {(k:String,v:List[Int]) <- iid}{ + for {(k: String, v: List[Int]) <- iid} { v.foreach(item => res(item) = k) } + (0 until idl).foreach(i => { + if (res(i) == null) + res(i) = "" + }) return res.mkString(" ") - } "" } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala index 4c014a95c..a0e20be1a 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala @@ -1,13 +1,17 @@ package eu.dnetlib.doiboost.mag import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.schema.oaf.Publication +import eu.dnetlib.doiboost.DoiBoostMappingUtil.asField import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Dataset, SaveMode, SparkSession} +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} import org.apache.spark.sql.functions._ +import scala.collection.JavaConverters._ + object SparkPreProcessMAG { @@ -23,15 +27,21 @@ object SparkPreProcessMAG { .config(conf) .appName(getClass.getSimpleName) .master(parser.get("master")).getOrCreate() + + val sourcePath = parser.get("sourcePath") import spark.implicits._ + implicit val mapEncoderPubs: Encoder[Publication] = org.apache.spark.sql.Encoders.kryo[Publication] + implicit val tupleForJoinEncoder = Encoders.tuple(Encoders.STRING, mapEncoderPubs) + + logger.info("Phase 1) make uninque DOI in Papers:") - val d: Dataset[Papers] = spark.read.load(s"${parser.get("sourcePath")}/Papers").as[Papers] + val d: Dataset[MagPapers] = spark.read.load(s"${parser.get("sourcePath")}/Papers").as[MagPapers] // Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one - val result: RDD[Papers] = d.where(col("Doi").isNotNull).rdd.map { p: Papers => Tuple2(p.Doi, p) }.reduceByKey { case (p1: Papers, p2: Papers) => + val result: RDD[MagPapers] = d.where(col("Doi").isNotNull).rdd.map { p: MagPapers => Tuple2(p.Doi, p) }.reduceByKey { case (p1: MagPapers, p2: MagPapers) => var r = if (p1 == null) p2 else p1 if (p1 != null && p2 != null) { if (p1.CreatedDate != null && p2.CreatedDate != null) { @@ -46,16 +56,83 @@ object SparkPreProcessMAG { r }.map(_._2) - val distinctPaper: Dataset[Papers] = spark.createDataset(result) + val distinctPaper: Dataset[MagPapers] = spark.createDataset(result) distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct") logger.info(s"Total number of element: ${result.count()}") - logger.info("Phase 2) convert InverdIndex Abastrac to string") - val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[PaperAbstract] + logger.info("Phase 3) Group Author by PaperId") + val authors = spark.read.load(s"$sourcePath/Authors").as[MagAuthor] + + val affiliation =spark.read.load(s"$sourcePath/Affiliations").as[MagAffiliation] + + val paperAuthorAffiliation =spark.read.load(s"$sourcePath/PaperAuthorAffiliations").as[MagPaperAuthorAffiliation] + + + paperAuthorAffiliation.joinWith(authors, paperAuthorAffiliation("AuthorId").equalTo(authors("AuthorId"))) + .map{case (a:MagPaperAuthorAffiliation,b:MagAuthor )=> (a.AffiliationId,MagPaperAuthorDenormalized(a.PaperId, b, null)) } + .joinWith(affiliation, affiliation("AffiliationId").equalTo(col("_1")), "left") + .map(s => { + val mpa = s._1._2 + val af = s._2 + if (af!= null) { + MagPaperAuthorDenormalized(mpa.PaperId, mpa.author, af.DisplayName) + } else + mpa + }).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation")).as("authors")) + .write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_1_paper_authors") + + + + logger.info("Phase 4) create First Version of publication Entity with Paper Journal and Authors") + + + val journals = spark.read.load(s"$sourcePath/Journals").as[MagJournal] + + val papers =spark.read.load((s"${parser.get("targetPath")}/Papers_distinct")).as[MagPapers] + + val paperWithAuthors = spark.read.load(s"${parser.get("targetPath")}/merge_step_1_paper_authors").as[MagPaperWithAuthorList] + + + + val firstJoin =papers.joinWith(journals, papers("JournalId").equalTo(journals("JournalId")),"left") + firstJoin.joinWith(paperWithAuthors, firstJoin("_1.PaperId").equalTo(paperWithAuthors("PaperId")), "left") + .map { a: ((MagPapers, MagJournal), MagPaperWithAuthorList) => ConversionUtil.createOAFFromJournalAuthorPaper(a) }.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_2") + + + + var magPubs:Dataset[(String,Publication)] = spark.read.load(s"${parser.get("targetPath")}/merge_step_2").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String,Publication)] + + val paperUrlDataset = spark.read.load(s"$sourcePath/PaperUrls").as[MagPaperUrl].groupBy("PaperId").agg(collect_list(struct("sourceUrl")).as("instances")).as[MagUrl] + + + logger.info("Phase 5) enrich publication with URL and Instances") + + magPubs.joinWith(paperUrlDataset, col("_1").equalTo(paperUrlDataset("PaperId")), "left") + .map{a:((String,Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2))} + .write.mode(SaveMode.Overwrite) + .save(s"${parser.get("targetPath")}/merge_step_3") + + + + logger.info("Phase 6) Enrich Publication with description") + val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract] pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract") + val paperAbstract =spark.read.load((s"${parser.get("targetPath")}/PaperAbstract")).as[MagPaperAbstract] - distinctPaper.joinWith(pa, col("PaperId").eqia) + + magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_3").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String,Publication)] + + magPubs.joinWith(paperAbstract,col("_1").equalTo(paperAbstract("PaperId")), "left").map(p=> + { + val pub = p._1._2 + val abst = p._2 + if (abst!= null) { + pub.setDescription(List(asField(abst.IndexedAbstract)).asJava) + } + pub + } + ).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_4") } diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml index ba6eea364..2277b79b0 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml @@ -72,6 +72,7 @@ --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 ${sparkExtraOPT} --sourcePath${sourcePath} diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/config-default.xml index 3726022cb..a720e7592 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/config-default.xml @@ -11,6 +11,10 @@ queueName default + + oozie.use.system.libpath + true + oozie.action.sharelib.for.spark spark2 diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/workflow.xml index a4d65ed00..f258fae6e 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/workflow.xml @@ -16,6 +16,10 @@ sparkExecutorCores number of cores used by single executor + + outputPath + the working dir base path + @@ -47,6 +51,7 @@ -mt yarn --workingPath_orcid${workingPath_activities}/ + -o${outputPath}/ diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala index 0aaaeb377..4d26969dd 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala @@ -1,10 +1,21 @@ package eu.dnetlib.doiboost.mag -import org.codehaus.jackson.map.ObjectMapper +import eu.dnetlib.dhp.schema.oaf.Publication +import org.apache.htrace.fasterxml.jackson.databind.SerializationFeature +import org.apache.spark.SparkConf +import org.apache.spark.api.java.function.MapFunction +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig} import org.junit.jupiter.api.Test import org.slf4j.{Logger, LoggerFactory} import org.junit.jupiter.api.Assertions._ +import org.apache.spark.sql.functions._ + +import scala.collection.JavaConverters._ import scala.io.Source +import scala.reflect.ClassTag +import scala.util.matching.Regex + class MAGMappingTest { @@ -13,14 +24,49 @@ class MAGMappingTest { val mapper = new ObjectMapper() - //@Test + @Test def testMAGCSV(): Unit = { - SparkPreProcessMAG.main("-m local[*] -s /data/doiboost/mag/datasets -t /data/doiboost/mag/datasets/preprocess".split(" ")) + // SparkPreProcessMAG.main("-m local[*] -s /data/doiboost/mag/datasets -t /data/doiboost/mag/datasets/preprocess".split(" ")) + + val sparkConf: SparkConf = new SparkConf + + val spark: SparkSession = SparkSession.builder() + .config(sparkConf) + .appName(getClass.getSimpleName) + .master("local[*]") + .getOrCreate() + + import spark.implicits._ + + + implicit val mapEncoderPubs: Encoder[Publication] = org.apache.spark.sql.Encoders.kryo[Publication] + implicit val longBarEncoder = Encoders.tuple(Encoders.STRING, mapEncoderPubs) + + val sourcePath = "/data/doiboost/mag/input" + + mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT) + + + val magOAF = spark.read.load("$sourcePath/merge_step_4").as[Publication] + + println(magOAF.first().getOriginalId) + + + magOAF.map(k => (ConversionUtil.extractMagIdentifier(k.getOriginalId.asScala),k)).as[(String,Publication)].show() + + + println((ConversionUtil.extractMagIdentifier(magOAF.first().getOriginalId.asScala))) + + val magIDRegex: Regex = "^[0-9]+$".r + + + println(magIDRegex.findFirstMatchIn("suca").isDefined) + } @Test - def buildInvertedIndexTest() :Unit = { + def buildInvertedIndexTest(): Unit = { val json_input = Source.fromInputStream(getClass.getResourceAsStream("invertedIndex.json")).mkString val description = ConversionUtil.convertInvertedIndexString(json_input) assertNotNull(description) @@ -32,3 +78,5 @@ class MAGMappingTest { } + +