diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala
new file mode 100644
index 000000000..68a3231e0
--- /dev/null
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala
@@ -0,0 +1,104 @@
+package eu.dnetlib.doiboost
+
+import eu.dnetlib.dhp.schema.oaf.{DataInfo, Dataset, Field, KeyValue, Qualifier, Result, StructuredProperty}
+import eu.dnetlib.dhp.utils.DHPUtils
+
+object DoiBoostMappingUtil {
+
+ //STATIC STRING
+ val MAG = "microsoft"
+ val ORCID = "ORCID"
+ val CROSSREF = "Crossref"
+ val UNPAYWALL = "UnpayWall"
+ val GRID_AC = "grid.ac"
+ val WIKPEDIA = "wikpedia"
+ val doiBoostNSPREFIX = "doiboost____"
+ val OPENAIRE_PREFIX = "openaire____"
+ val SEPARATOR = "::"
+ val DNET_LANGUAGES = "dnet:languages"
+ val PID_TYPES = "dnet:pid_types"
+
+
+
+ def generateDataInfo(): DataInfo = {
+ val di = new DataInfo
+ di.setDeletedbyinference(false)
+ di.setInferred(false)
+ di.setInvisible(false)
+ di.setTrust("0.9")
+ di.setProvenanceaction(createQualifier("sysimport:actionset", "dnet:provenanceActions"))
+ di
+ }
+
+
+ def createSP(value: String, classId: String, schemeId: String): StructuredProperty = {
+ val sp = new StructuredProperty
+ sp.setQualifier(createQualifier(classId, schemeId))
+ sp.setValue(value)
+ sp
+
+ }
+
+ def createSP(value: String, classId: String, schemeId: String, dataInfo: DataInfo): StructuredProperty = {
+ val sp = new StructuredProperty
+ sp.setQualifier(createQualifier(classId, schemeId))
+ sp.setValue(value)
+ sp.setDataInfo(dataInfo)
+ sp
+
+ }
+
+ def createCrossrefCollectedFrom(): KeyValue = {
+
+ val cf = new KeyValue
+ cf.setValue(CROSSREF)
+ cf.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5("crossref"))
+ cf
+
+ }
+
+ def generateIdentifier(oaf: Result, doi: String): String = {
+ val id = DHPUtils.md5(doi.toLowerCase)
+ if (oaf.isInstanceOf[Dataset])
+ return s"60|${doiBoostNSPREFIX}${SEPARATOR}${id}"
+ s"50|${doiBoostNSPREFIX}${SEPARATOR}${id}"
+ }
+
+
+
+
+
+ def createMAGCollectedFrom(): KeyValue = {
+
+ val cf = new KeyValue
+ cf.setValue(MAG)
+ cf.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5(MAG))
+ cf
+
+ }
+
+ def createQualifier(clsName: String, clsValue: String, schName: String, schValue: String): Qualifier = {
+ val q = new Qualifier
+ q.setClassid(clsName)
+ q.setClassname(clsValue)
+ q.setSchemeid(schName)
+ q.setSchemename(schValue)
+ q
+ }
+
+ def createQualifier(cls: String, sch: String): Qualifier = {
+ createQualifier(cls, cls, sch, sch)
+ }
+
+
+ def asField[T](value: T): Field[T] = {
+ val tmp = new Field[T]
+ tmp.setValue(value)
+ tmp
+
+
+ }
+
+
+
+}
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala
index 2d3b9a43a..eda3bf17a 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala
@@ -14,6 +14,7 @@ import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.matching.Regex
+import eu.dnetlib.doiboost.DoiBoostMappingUtil._
case class mappingAffiliation(name: String) {}
@@ -25,18 +26,7 @@ case class mappingFunder(name: String, DOI: Option[String], award: Option[List[S
case object Crossref2Oaf {
val logger: Logger = LoggerFactory.getLogger(Crossref2Oaf.getClass)
- //STATIC STRING
- val MAG = "MAG"
- val ORCID = "ORCID"
- val CROSSREF = "Crossref"
- val UNPAYWALL = "UnpayWall"
- val GRID_AC = "grid.ac"
- val WIKPEDIA = "wikpedia"
- val doiBoostNSPREFIX = "doiboost____"
- val OPENAIRE_PREFIX = "openaire____"
- val SEPARATOR = "::"
- val DNET_LANGUAGES = "dnet:languages"
- val PID_TYPES = "dnet:pid_types"
+
val mappingCrossrefType = Map(
"book-section" -> "publication",
@@ -116,7 +106,7 @@ case object Crossref2Oaf {
result.setLastupdatetimestamp((json \ "indexed" \ "timestamp").extract[Long])
result.setDateofcollection((json \ "indexed" \ "date-time").extract[String])
- result.setCollectedfrom(List(createCollectedFrom()).asJava)
+ result.setCollectedfrom(List(createCrossrefCollectedFrom()).asJava)
// Publisher ( Name of work's publisher mapped into Result/Publisher)
val publisher = (json \ "publisher").extractOrElse[String](null)
@@ -168,7 +158,7 @@ case object Crossref2Oaf {
result.setInstance(List(instance).asJava)
instance.setInstancetype(createQualifier(cobjCategory.substring(0, 4), cobjCategory.substring(5), "dnet:publication_resource", "dnet:publication_resource"))
- instance.setCollectedfrom(createCollectedFrom())
+ instance.setCollectedfrom(createCrossrefCollectedFrom())
if (StringUtils.isNotBlank(issuedDate)) {
instance.setDateofacceptance(asField(issuedDate))
}
@@ -215,7 +205,7 @@ case object Crossref2Oaf {
val funderList: List[mappingFunder] = (json \ "funder").extractOrElse[List[mappingFunder]](List())
if (funderList.nonEmpty) {
- resultList = resultList ::: mappingFunderToRelations(funderList, result.getId, createCollectedFrom(), result.getDataInfo, result.getLastupdatetimestamp)
+ resultList = resultList ::: mappingFunderToRelations(funderList, result.getId, createCrossrefCollectedFrom(), result.getDataInfo, result.getLastupdatetimestamp)
}
@@ -416,71 +406,8 @@ case object Crossref2Oaf {
}
- def generateIdentifier(oaf: Result, doi: String): String = {
- val id = DHPUtils.md5(doi.toLowerCase)
- if (oaf.isInstanceOf[Dataset])
- return s"60|${doiBoostNSPREFIX}${SEPARATOR}${id}"
- s"50|${doiBoostNSPREFIX}${SEPARATOR}${id}"
- }
-
- def asField[T](value: T): Field[T] = {
- val tmp = new Field[T]
- tmp.setValue(value)
- tmp
- }
-
-
- def generateDataInfo(): DataInfo = {
- val di = new DataInfo
- di.setDeletedbyinference(false)
- di.setInferred(false)
- di.setInvisible(false)
- di.setTrust("0.9")
- di.setProvenanceaction(createQualifier("sysimport:actionset", "dnet:provenanceActions"))
- di
- }
-
-
- def createSP(value: String, classId: String, schemeId: String): StructuredProperty = {
- val sp = new StructuredProperty
- sp.setQualifier(createQualifier(classId, schemeId))
- sp.setValue(value)
- sp
-
- }
-
- def createSP(value: String, classId: String, schemeId: String, dataInfo: DataInfo): StructuredProperty = {
- val sp = new StructuredProperty
- sp.setQualifier(createQualifier(classId, schemeId))
- sp.setValue(value)
- sp.setDataInfo(dataInfo)
- sp
-
- }
-
- def createCollectedFrom(): KeyValue = {
-
- val cf = new KeyValue
- cf.setValue(CROSSREF)
- cf.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5("crossref"))
- cf
-
- }
-
- def createQualifier(clsName: String, clsValue: String, schName: String, schValue: String): Qualifier = {
- val q = new Qualifier
- q.setClassid(clsName)
- q.setClassname(clsValue)
- q.setSchemeid(schName)
- q.setSchemename(schValue)
- q
- }
-
- def createQualifier(cls: String, sch: String): Qualifier = {
- createQualifier(cls, cls, sch, sch)
- }
def generateItemFromType(objectType: String, objectSubType: String): Result = {
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala
index 189e90ed9..17f0395ca 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala
@@ -1,52 +1,215 @@
package eu.dnetlib.doiboost.mag
+import eu.dnetlib.dhp.schema.oaf.{Instance, Journal, Publication}
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
+import eu.dnetlib.doiboost.DoiBoostMappingUtil._
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.matching.Regex
-case class Papers(PaperId:Long, Rank:Integer, Doi:String,
- DocType:String, PaperTitle:String, OriginalTitle:String,
- BookTitle:String, Year:Option[Integer], Date:Option[java.sql.Timestamp], Publisher:String,
- JournalId:Option[Long], ConferenceSeriesId:Option[Long], ConferenceInstanceId:Option[Long],
- Volume:String, Issue:String, FirstPage:String, LastPage:String,
- ReferenceCount:Option[Long], CitationCount:Option[Long], EstimatedCitation:Option[Long],
- OriginalVenue:String, FamilyId:Option[Long], CreatedDate:java.sql.Timestamp) {}
+case class MagPapers(PaperId: Long, Rank: Integer, Doi: String,
+ DocType: String, PaperTitle: String, OriginalTitle: String,
+ BookTitle: String, Year: Option[Integer], Date: Option[java.sql.Timestamp], Publisher: String,
+ JournalId: Option[Long], ConferenceSeriesId: Option[Long], ConferenceInstanceId: Option[Long],
+ Volume: String, Issue: String, FirstPage: String, LastPage: String,
+ ReferenceCount: Option[Long], CitationCount: Option[Long], EstimatedCitation: Option[Long],
+ OriginalVenue: String, FamilyId: Option[Long], CreatedDate: java.sql.Timestamp) {}
-case class PaperAbstract(PaperId:Long,IndexedAbstract:String) {}
+case class MagPaperAbstract(PaperId: Long, IndexedAbstract: String) {}
+case class MagAuthor(AuthorId: Long, Rank: Option[Int], NormalizedName: Option[String], DisplayName: Option[String], LastKnownAffiliationId: Option[Long], PaperCount: Option[Long], CitationCount: Option[Long], CreatedDate: Option[java.sql.Timestamp]) {}
+
+case class MagAffiliation(AffiliationId: Long, Rank: Int, NormalizedName: String, DisplayName: String, GridId: String, OfficialPage: String, WikiPage: String, PaperCount: Long, CitationCount: Long, Latitude: Option[Float], Longitude: Option[Float], CreatedDate: java.sql.Timestamp) {}
+
+case class MagPaperAuthorAffiliation(PaperId: Long, AuthorId: Long, AffiliationId: Option[Long], AuthorSequenceNumber: Int, OriginalAuthor: String, OriginalAffiliation: String) {}
+
+
+case class MagAuthorAffiliation(author: MagAuthor, affiliation:String)
+
+case class MagPaperWithAuthorList(PaperId: Long, authors: List[MagAuthorAffiliation]) {}
+
+case class MagPaperAuthorDenormalized(PaperId: Long, author: MagAuthor, affiliation:String) {}
+
+case class MagPaperUrl(PaperId: Long, SourceType: Option[Int], SourceUrl: Option[String], LanguageCode: Option[String]) {}
+
+case class MagUrl(PaperId: Long, instances: List[String])
+
+
+case class MagJournal(JournalId: Long, Rank: Option[Int], NormalizedName: Option[String], DisplayName: Option[String], Issn: Option[String], Publisher: Option[String], Webpage: Option[String], PaperCount: Option[Long], CitationCount: Option[Long], CreatedDate: Option[java.sql.Timestamp]) {}
case object ConversionUtil {
+ def extractMagIdentifier(pids:mutable.Buffer[String]) :String ={
+ val magIDRegex: Regex = "^[0-9]+$".r
+ val s =pids.filter(p=> magIDRegex.findAllIn(p).hasNext)
-
- def transformPaperAbstract(input:PaperAbstract) : PaperAbstract = {
- PaperAbstract(input.PaperId, convertInvertedIndexString(input.IndexedAbstract))
+ if (s.nonEmpty)
+ return s.head
+ null
}
- def convertInvertedIndexString(json_input:String) :String = {
+ def addInstances(a: (Publication, MagUrl)): Publication = {
+ val pub = a._1
+ val urls = a._2
+
+
+ val i = new Instance
+
+
+ if (urls!= null) {
+
+ val l:List[String] = urls.instances.filter(k=>k.nonEmpty):::List(s"https://academic.microsoft.com/#/detail/${extractMagIdentifier(pub.getOriginalId.asScala)}")
+
+ i.setUrl(l.asJava)
+ }
+ else
+ i.setUrl(List(s"https://academic.microsoft.com/#/detail/${extractMagIdentifier(pub.getOriginalId.asScala)}").asJava)
+
+ i.setCollectedfrom(createMAGCollectedFrom())
+ pub.setInstance(List(i).asJava)
+ pub
+ }
+
+
+ def transformPaperAbstract(input: MagPaperAbstract): MagPaperAbstract = {
+ MagPaperAbstract(input.PaperId, convertInvertedIndexString(input.IndexedAbstract))
+ }
+
+
+ def createOAFFromJournalAuthorPaper(inputParams: ((MagPapers, MagJournal), MagPaperWithAuthorList)): Publication = {
+ val paper = inputParams._1._1
+ val journal = inputParams._1._2
+ val authors = inputParams._2
+
+ val pub = new Publication
+ pub.setPid(List(createSP(paper.Doi.toLowerCase, "doi", PID_TYPES)).asJava)
+ pub.setOriginalId(List(paper.PaperId.toString, paper.Doi.toLowerCase).asJava)
+
+ //Set identifier as {50|60} | doiboost____::md5(DOI)
+ pub.setId(generateIdentifier(pub, paper.Doi.toLowerCase))
+
+ val mainTitles = createSP(paper.PaperTitle, "main title", "dnet:dataCite_title")
+ val originalTitles = createSP(paper.OriginalTitle, "alternative title", "dnet:dataCite_title")
+ pub.setTitle(List(mainTitles, originalTitles).asJava)
+
+ pub.setSource(List(asField(paper.BookTitle)).asJava)
+
+ val authorsOAF = authors.authors.map { f: MagAuthorAffiliation =>
+
+ val a: eu.dnetlib.dhp.schema.oaf.Author = new eu.dnetlib.dhp.schema.oaf.Author
+
+ a.setFullname(f.author.DisplayName.get)
+
+ if(f.affiliation!= null)
+ a.setAffiliation(List(asField(f.affiliation)).asJava)
+ a.setPid(List(createSP(s"https://academic.microsoft.com/#/detail/${f.author.AuthorId}", "URL", PID_TYPES)).asJava)
+ a
+ }
+ pub.setAuthor(authorsOAF.asJava)
+
+
+ if (paper.Date != null && paper.Date.isDefined) {
+ pub.setDateofacceptance(asField(paper.Date.get.toString))
+ }
+ pub.setPublisher(asField(paper.Publisher))
+
+
+ if (journal != null && journal.DisplayName.isDefined) {
+ val j = new Journal
+
+ j.setName(journal.DisplayName.get)
+ j.setSp(paper.FirstPage)
+ j.setEp(paper.LastPage)
+ if (journal.Publisher.isDefined)
+ j.setEdition(journal.Publisher.get)
+ if (journal.Issn.isDefined)
+ j.setIssnPrinted(journal.Issn.get)
+ pub.setJournal(j)
+ }
+ pub
+ }
+
+
+ def createOAF(inputParams: ((MagPapers, MagPaperWithAuthorList), MagPaperAbstract)): Publication = {
+
+ val paper = inputParams._1._1
+ val authors = inputParams._1._2
+ val description = inputParams._2
+
+ val pub = new Publication
+ pub.setPid(List(createSP(paper.Doi.toLowerCase, "doi", PID_TYPES)).asJava)
+ pub.setOriginalId(List(paper.PaperId.toString, paper.Doi.toLowerCase).asJava)
+
+ //Set identifier as {50|60} | doiboost____::md5(DOI)
+ pub.setId(generateIdentifier(pub, paper.Doi.toLowerCase))
+
+ val mainTitles = createSP(paper.PaperTitle, "main title", "dnet:dataCite_title")
+ val originalTitles = createSP(paper.OriginalTitle, "alternative title", "dnet:dataCite_title")
+ pub.setTitle(List(mainTitles, originalTitles).asJava)
+
+ pub.setSource(List(asField(paper.BookTitle)).asJava)
+
+
+ if (description != null) {
+ pub.setDescription(List(asField(description.IndexedAbstract)).asJava)
+ }
+
+
+ val authorsOAF = authors.authors.map { f: MagAuthorAffiliation =>
+
+ val a: eu.dnetlib.dhp.schema.oaf.Author = new eu.dnetlib.dhp.schema.oaf.Author
+
+ a.setFullname(f.author.DisplayName.get)
+
+ if(f.affiliation!= null)
+ a.setAffiliation(List(asField(f.affiliation)).asJava)
+
+
+ a.setPid(List(createSP(s"https://academic.microsoft.com/#/detail/${f.author.AuthorId}", "URL", PID_TYPES)).asJava)
+
+ a
+
+ }
+
+
+ if (paper.Date != null) {
+ pub.setDateofacceptance(asField(paper.Date.toString))
+ }
+
+ pub.setAuthor(authorsOAF.asJava)
+
+
+ pub
+
+ }
+
+
+ def convertInvertedIndexString(json_input: String): String = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(json_input)
-
-
-
val idl = (json \ "IndexLength").extract[Int]
-
if (idl > 0) {
val res = Array.ofDim[String](idl)
val iid = (json \ "InvertedIndex").extract[Map[String, List[Int]]]
- for {(k:String,v:List[Int]) <- iid}{
+ for {(k: String, v: List[Int]) <- iid} {
v.foreach(item => res(item) = k)
}
+ (0 until idl).foreach(i => {
+ if (res(i) == null)
+ res(i) = ""
+ })
return res.mkString(" ")
-
}
""
}
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala
index 4c014a95c..a0e20be1a 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala
@@ -1,13 +1,17 @@
package eu.dnetlib.doiboost.mag
import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import eu.dnetlib.dhp.schema.oaf.Publication
+import eu.dnetlib.doiboost.DoiBoostMappingUtil.asField
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Dataset, SaveMode, SparkSession}
+import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.sql.functions._
+import scala.collection.JavaConverters._
+
object SparkPreProcessMAG {
@@ -23,15 +27,21 @@ object SparkPreProcessMAG {
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
+
+ val sourcePath = parser.get("sourcePath")
import spark.implicits._
+ implicit val mapEncoderPubs: Encoder[Publication] = org.apache.spark.sql.Encoders.kryo[Publication]
+ implicit val tupleForJoinEncoder = Encoders.tuple(Encoders.STRING, mapEncoderPubs)
+
+
logger.info("Phase 1) make uninque DOI in Papers:")
- val d: Dataset[Papers] = spark.read.load(s"${parser.get("sourcePath")}/Papers").as[Papers]
+ val d: Dataset[MagPapers] = spark.read.load(s"${parser.get("sourcePath")}/Papers").as[MagPapers]
// Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one
- val result: RDD[Papers] = d.where(col("Doi").isNotNull).rdd.map { p: Papers => Tuple2(p.Doi, p) }.reduceByKey { case (p1: Papers, p2: Papers) =>
+ val result: RDD[MagPapers] = d.where(col("Doi").isNotNull).rdd.map { p: MagPapers => Tuple2(p.Doi, p) }.reduceByKey { case (p1: MagPapers, p2: MagPapers) =>
var r = if (p1 == null) p2 else p1
if (p1 != null && p2 != null) {
if (p1.CreatedDate != null && p2.CreatedDate != null) {
@@ -46,16 +56,83 @@ object SparkPreProcessMAG {
r
}.map(_._2)
- val distinctPaper: Dataset[Papers] = spark.createDataset(result)
+ val distinctPaper: Dataset[MagPapers] = spark.createDataset(result)
distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct")
logger.info(s"Total number of element: ${result.count()}")
- logger.info("Phase 2) convert InverdIndex Abastrac to string")
- val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[PaperAbstract]
+ logger.info("Phase 3) Group Author by PaperId")
+ val authors = spark.read.load(s"$sourcePath/Authors").as[MagAuthor]
+
+ val affiliation =spark.read.load(s"$sourcePath/Affiliations").as[MagAffiliation]
+
+ val paperAuthorAffiliation =spark.read.load(s"$sourcePath/PaperAuthorAffiliations").as[MagPaperAuthorAffiliation]
+
+
+ paperAuthorAffiliation.joinWith(authors, paperAuthorAffiliation("AuthorId").equalTo(authors("AuthorId")))
+ .map{case (a:MagPaperAuthorAffiliation,b:MagAuthor )=> (a.AffiliationId,MagPaperAuthorDenormalized(a.PaperId, b, null)) }
+ .joinWith(affiliation, affiliation("AffiliationId").equalTo(col("_1")), "left")
+ .map(s => {
+ val mpa = s._1._2
+ val af = s._2
+ if (af!= null) {
+ MagPaperAuthorDenormalized(mpa.PaperId, mpa.author, af.DisplayName)
+ } else
+ mpa
+ }).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation")).as("authors"))
+ .write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_1_paper_authors")
+
+
+
+ logger.info("Phase 4) create First Version of publication Entity with Paper Journal and Authors")
+
+
+ val journals = spark.read.load(s"$sourcePath/Journals").as[MagJournal]
+
+ val papers =spark.read.load((s"${parser.get("targetPath")}/Papers_distinct")).as[MagPapers]
+
+ val paperWithAuthors = spark.read.load(s"${parser.get("targetPath")}/merge_step_1_paper_authors").as[MagPaperWithAuthorList]
+
+
+
+ val firstJoin =papers.joinWith(journals, papers("JournalId").equalTo(journals("JournalId")),"left")
+ firstJoin.joinWith(paperWithAuthors, firstJoin("_1.PaperId").equalTo(paperWithAuthors("PaperId")), "left")
+ .map { a: ((MagPapers, MagJournal), MagPaperWithAuthorList) => ConversionUtil.createOAFFromJournalAuthorPaper(a) }.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_2")
+
+
+
+ var magPubs:Dataset[(String,Publication)] = spark.read.load(s"${parser.get("targetPath")}/merge_step_2").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String,Publication)]
+
+ val paperUrlDataset = spark.read.load(s"$sourcePath/PaperUrls").as[MagPaperUrl].groupBy("PaperId").agg(collect_list(struct("sourceUrl")).as("instances")).as[MagUrl]
+
+
+ logger.info("Phase 5) enrich publication with URL and Instances")
+
+ magPubs.joinWith(paperUrlDataset, col("_1").equalTo(paperUrlDataset("PaperId")), "left")
+ .map{a:((String,Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2))}
+ .write.mode(SaveMode.Overwrite)
+ .save(s"${parser.get("targetPath")}/merge_step_3")
+
+
+
+ logger.info("Phase 6) Enrich Publication with description")
+ val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract]
pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract")
+ val paperAbstract =spark.read.load((s"${parser.get("targetPath")}/PaperAbstract")).as[MagPaperAbstract]
- distinctPaper.joinWith(pa, col("PaperId").eqia)
+
+ magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_3").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String,Publication)]
+
+ magPubs.joinWith(paperAbstract,col("_1").equalTo(paperAbstract("PaperId")), "left").map(p=>
+ {
+ val pub = p._1._2
+ val abst = p._2
+ if (abst!= null) {
+ pub.setDescription(List(asField(abst.IndexedAbstract)).asJava)
+ }
+ pub
+ }
+ ).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_4")
}
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml
index ba6eea364..2277b79b0 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml
@@ -72,6 +72,7 @@
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
+ --conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT}
--sourcePath${sourcePath}
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/config-default.xml
index 3726022cb..a720e7592 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/config-default.xml
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/config-default.xml
@@ -11,6 +11,10 @@
queueName
default
+
+ oozie.use.system.libpath
+ true
+
oozie.action.sharelib.for.spark
spark2
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/workflow.xml
index a4d65ed00..f258fae6e 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_gen_authors/oozie_app/workflow.xml
@@ -16,6 +16,10 @@
sparkExecutorCores
number of cores used by single executor
+
+ outputPath
+ the working dir base path
+
@@ -47,6 +51,7 @@
-mt yarn
--workingPath_orcid${workingPath_activities}/
+ -o${outputPath}/
diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala
index 0aaaeb377..4d26969dd 100644
--- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala
+++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala
@@ -1,10 +1,21 @@
package eu.dnetlib.doiboost.mag
-import org.codehaus.jackson.map.ObjectMapper
+import eu.dnetlib.dhp.schema.oaf.Publication
+import org.apache.htrace.fasterxml.jackson.databind.SerializationFeature
+import org.apache.spark.SparkConf
+import org.apache.spark.api.java.function.MapFunction
+import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
+import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
import org.junit.jupiter.api.Test
import org.slf4j.{Logger, LoggerFactory}
import org.junit.jupiter.api.Assertions._
+import org.apache.spark.sql.functions._
+
+import scala.collection.JavaConverters._
import scala.io.Source
+import scala.reflect.ClassTag
+import scala.util.matching.Regex
+
class MAGMappingTest {
@@ -13,14 +24,49 @@ class MAGMappingTest {
val mapper = new ObjectMapper()
- //@Test
+ @Test
def testMAGCSV(): Unit = {
- SparkPreProcessMAG.main("-m local[*] -s /data/doiboost/mag/datasets -t /data/doiboost/mag/datasets/preprocess".split(" "))
+ // SparkPreProcessMAG.main("-m local[*] -s /data/doiboost/mag/datasets -t /data/doiboost/mag/datasets/preprocess".split(" "))
+
+ val sparkConf: SparkConf = new SparkConf
+
+ val spark: SparkSession = SparkSession.builder()
+ .config(sparkConf)
+ .appName(getClass.getSimpleName)
+ .master("local[*]")
+ .getOrCreate()
+
+ import spark.implicits._
+
+
+ implicit val mapEncoderPubs: Encoder[Publication] = org.apache.spark.sql.Encoders.kryo[Publication]
+ implicit val longBarEncoder = Encoders.tuple(Encoders.STRING, mapEncoderPubs)
+
+ val sourcePath = "/data/doiboost/mag/input"
+
+ mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
+
+
+ val magOAF = spark.read.load("$sourcePath/merge_step_4").as[Publication]
+
+ println(magOAF.first().getOriginalId)
+
+
+ magOAF.map(k => (ConversionUtil.extractMagIdentifier(k.getOriginalId.asScala),k)).as[(String,Publication)].show()
+
+
+ println((ConversionUtil.extractMagIdentifier(magOAF.first().getOriginalId.asScala)))
+
+ val magIDRegex: Regex = "^[0-9]+$".r
+
+
+ println(magIDRegex.findFirstMatchIn("suca").isDefined)
+
}
@Test
- def buildInvertedIndexTest() :Unit = {
+ def buildInvertedIndexTest(): Unit = {
val json_input = Source.fromInputStream(getClass.getResourceAsStream("invertedIndex.json")).mkString
val description = ConversionUtil.convertInvertedIndexString(json_input)
assertNotNull(description)
@@ -32,3 +78,5 @@ class MAGMappingTest {
}
+
+