diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala
index 68a3231e06..3980217d83 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala
@@ -21,11 +21,15 @@ object DoiBoostMappingUtil {
def generateDataInfo(): DataInfo = {
+ generateDataInfo("0.9")
+ }
+
+ def generateDataInfo(trust:String): DataInfo = {
val di = new DataInfo
di.setDeletedbyinference(false)
di.setInferred(false)
di.setInvisible(false)
- di.setTrust("0.9")
+ di.setTrust(trust)
di.setProvenanceaction(createQualifier("sysimport:actionset", "dnet:provenanceActions"))
di
}
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala
index eda3bf17a5..b515a73306 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala
@@ -137,9 +137,21 @@ case object Crossref2Oaf {
if (StringUtils.isNotBlank(issuedDate)) {
result.setDateofacceptance(asField(issuedDate))
}
+ else {
+ result.setDateofacceptance(asField(createdDate.getValue))
+ }
result.setRelevantdate(List(createdDate, postedDate, acceptedDate, publishedOnlineDate, publishedPrintDate).filter(p => p != null).asJava)
+ //Mapping Subject
+ val subjectList:List[String] = (json \ "subject").extractOrElse[List[String]](List())
+
+ if (subjectList.nonEmpty) {
+ result.setSubject(subjectList.map(s=> createSP(s, "keywords", "dnet:subject_classification_typologies")).asJava)
+ }
+
+
+
//Mapping AUthor
val authorList: List[mappingAuthor] = (json \ "author").extractOrElse[List[mappingAuthor]](List())
result.setAuthor(authorList.map(a => generateAuhtor(a.given.orNull, a.family, a.ORCID.orNull)).asJava)
@@ -224,6 +236,16 @@ case object Crossref2Oaf {
val queue = new mutable.Queue[Relation]
+ def snfRule(award:String): String = {
+ var tmp1 = StringUtils.substringAfter(award,"_")
+ val tmp2 = StringUtils.substringBefore(tmp1,"/")
+ logger.debug(s"From $award to $tmp2")
+ tmp2
+
+
+ }
+
+
def extractECAward(award: String): String = {
val awardECRegex: Regex = "[0-9]{4,9}".r
if (awardECRegex.findAllIn(award).hasNext)
@@ -289,7 +311,7 @@ case object Crossref2Oaf {
case "10.13039/501100006588" |
"10.13039/501100004488" => generateSimpleRelationFromAward(funder, "irb_hr______", a=>a.replaceAll("Project No.", "").replaceAll("HRZZ-","") )
case "10.13039/501100006769"=> generateSimpleRelationFromAward(funder, "rsf_________", a=>a)
- case "10.13039/501100001711"=> generateSimpleRelationFromAward(funder, "snsf________", extractECAward)
+ case "10.13039/501100001711"=> generateSimpleRelationFromAward(funder, "snsf________", snfRule)
case "10.13039/501100004410"=> generateSimpleRelationFromAward(funder, "tubitakf____", a =>a)
case "10.10.13039/100004440"=> generateSimpleRelationFromAward(funder, "wt__________", a =>a)
case "10.13039/100004440"=> queue += generateRelation(sourceId,"1e5e62235d094afd01cd56e65112fc63", "wt__________" )
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala
index 17f0395ca0..e83236d53a 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala
@@ -40,6 +40,9 @@ case class MagPaperUrl(PaperId: Long, SourceType: Option[Int], SourceUrl: Option
case class MagUrl(PaperId: Long, instances: List[String])
+case class MagSubject(FieldOfStudyId:Long, DisplayName:String, MainType:Option[String], Score:Float){}
+
+case class MagFieldOfStudy(PaperId:Long, subjects:List[MagSubject]) {}
case class MagJournal(JournalId: Long, Rank: Option[Int], NormalizedName: Option[String], DisplayName: Option[String], Issn: Option[String], Publisher: Option[String], Webpage: Option[String], PaperCount: Option[Long], CitationCount: Option[Long], CreatedDate: Option[java.sql.Timestamp]) {}
@@ -135,6 +138,8 @@ case object ConversionUtil {
j.setIssnPrinted(journal.Issn.get)
pub.setJournal(j)
}
+ pub.setCollectedfrom(List(createMAGCollectedFrom()).asJava)
+ pub.setDataInfo(generateDataInfo())
pub
}
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala
index a0e20be1aa..0001c83c68 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala
@@ -1,20 +1,18 @@
package eu.dnetlib.doiboost.mag
import eu.dnetlib.dhp.application.ArgumentApplicationParser
-import eu.dnetlib.dhp.schema.oaf.Publication
-import eu.dnetlib.doiboost.DoiBoostMappingUtil.asField
+import eu.dnetlib.dhp.schema.oaf.{Publication, StructuredProperty}
+import eu.dnetlib.doiboost.DoiBoostMappingUtil
+import eu.dnetlib.doiboost.DoiBoostMappingUtil.{asField, createSP}
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.sql.functions._
-
import scala.collection.JavaConverters._
object SparkPreProcessMAG {
-
-
def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(getClass)
@@ -31,15 +29,11 @@ object SparkPreProcessMAG {
val sourcePath = parser.get("sourcePath")
import spark.implicits._
implicit val mapEncoderPubs: Encoder[Publication] = org.apache.spark.sql.Encoders.kryo[Publication]
- implicit val tupleForJoinEncoder = Encoders.tuple(Encoders.STRING, mapEncoderPubs)
-
-
+ implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs)
logger.info("Phase 1) make uninque DOI in Papers:")
-
val d: Dataset[MagPapers] = spark.read.load(s"${parser.get("sourcePath")}/Papers").as[MagPapers]
-
// Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one
val result: RDD[MagPapers] = d.where(col("Doi").isNotNull).rdd.map { p: MagPapers => Tuple2(p.Doi, p) }.reduceByKey { case (p1: MagPapers, p2: MagPapers) =>
var r = if (p1 == null) p2 else p1
@@ -56,6 +50,8 @@ object SparkPreProcessMAG {
r
}.map(_._2)
+
+
val distinctPaper: Dataset[MagPapers] = spark.createDataset(result)
distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct")
logger.info(s"Total number of element: ${result.count()}")
@@ -63,44 +59,36 @@ object SparkPreProcessMAG {
logger.info("Phase 3) Group Author by PaperId")
val authors = spark.read.load(s"$sourcePath/Authors").as[MagAuthor]
- val affiliation =spark.read.load(s"$sourcePath/Affiliations").as[MagAffiliation]
-
- val paperAuthorAffiliation =spark.read.load(s"$sourcePath/PaperAuthorAffiliations").as[MagPaperAuthorAffiliation]
-
+ val affiliation = spark.read.load(s"$sourcePath/Affiliations").as[MagAffiliation]
+ val paperAuthorAffiliation = spark.read.load(s"$sourcePath/PaperAuthorAffiliations").as[MagPaperAuthorAffiliation]
paperAuthorAffiliation.joinWith(authors, paperAuthorAffiliation("AuthorId").equalTo(authors("AuthorId")))
- .map{case (a:MagPaperAuthorAffiliation,b:MagAuthor )=> (a.AffiliationId,MagPaperAuthorDenormalized(a.PaperId, b, null)) }
+ .map { case (a: MagPaperAuthorAffiliation, b: MagAuthor) => (a.AffiliationId, MagPaperAuthorDenormalized(a.PaperId, b, null)) }
.joinWith(affiliation, affiliation("AffiliationId").equalTo(col("_1")), "left")
.map(s => {
- val mpa = s._1._2
- val af = s._2
- if (af!= null) {
- MagPaperAuthorDenormalized(mpa.PaperId, mpa.author, af.DisplayName)
- } else
- mpa
- }).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation")).as("authors"))
+ val mpa = s._1._2
+ val af = s._2
+ if (af != null) {
+ MagPaperAuthorDenormalized(mpa.PaperId, mpa.author, af.DisplayName)
+ } else
+ mpa
+ }).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation")).as("authors"))
.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_1_paper_authors")
-
-
logger.info("Phase 4) create First Version of publication Entity with Paper Journal and Authors")
-
val journals = spark.read.load(s"$sourcePath/Journals").as[MagJournal]
- val papers =spark.read.load((s"${parser.get("targetPath")}/Papers_distinct")).as[MagPapers]
+ val papers = spark.read.load((s"${parser.get("targetPath")}/Papers_distinct")).as[MagPapers]
val paperWithAuthors = spark.read.load(s"${parser.get("targetPath")}/merge_step_1_paper_authors").as[MagPaperWithAuthorList]
-
-
- val firstJoin =papers.joinWith(journals, papers("JournalId").equalTo(journals("JournalId")),"left")
+ val firstJoin = papers.joinWith(journals, papers("JournalId").equalTo(journals("JournalId")), "left")
firstJoin.joinWith(paperWithAuthors, firstJoin("_1.PaperId").equalTo(paperWithAuthors("PaperId")), "left")
.map { a: ((MagPapers, MagJournal), MagPaperWithAuthorList) => ConversionUtil.createOAFFromJournalAuthorPaper(a) }.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_2")
-
- var magPubs:Dataset[(String,Publication)] = spark.read.load(s"${parser.get("targetPath")}/merge_step_2").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String,Publication)]
+ var magPubs: Dataset[(String, Publication)] = spark.read.load(s"${parser.get("targetPath")}/merge_step_2").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)]
val paperUrlDataset = spark.read.load(s"$sourcePath/PaperUrls").as[MagPaperUrl].groupBy("PaperId").agg(collect_list(struct("sourceUrl")).as("instances")).as[MagUrl]
@@ -108,33 +96,69 @@ object SparkPreProcessMAG {
logger.info("Phase 5) enrich publication with URL and Instances")
magPubs.joinWith(paperUrlDataset, col("_1").equalTo(paperUrlDataset("PaperId")), "left")
- .map{a:((String,Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2))}
+ .map { a: ((String, Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2)) }
.write.mode(SaveMode.Overwrite)
.save(s"${parser.get("targetPath")}/merge_step_3")
-
logger.info("Phase 6) Enrich Publication with description")
val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract]
pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract")
- val paperAbstract =spark.read.load((s"${parser.get("targetPath")}/PaperAbstract")).as[MagPaperAbstract]
+ val paperAbstract = spark.read.load((s"${parser.get("targetPath")}/PaperAbstract")).as[MagPaperAbstract]
- magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_3").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String,Publication)]
+ magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_3").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)]
- magPubs.joinWith(paperAbstract,col("_1").equalTo(paperAbstract("PaperId")), "left").map(p=>
- {
- val pub = p._1._2
- val abst = p._2
- if (abst!= null) {
- pub.setDescription(List(asField(abst.IndexedAbstract)).asJava)
+ magPubs.joinWith(paperAbstract, col("_1").equalTo(paperAbstract("PaperId")), "left").map(p => {
+ val pub = p._1._2
+ val abst = p._2
+ if (abst != null) {
+ pub.setDescription(List(asField(abst.IndexedAbstract)).asJava)
+ }
+ pub
+ }
+ ).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_4")
+
+
+ logger.info("Phase 7) Enrich Publication with FieldOfStudy")
+
+ magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_4").as[Publication].map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)]
+
+ val fos = spark.read.load(s"$sourcePath/FieldsOfStudy").select($"FieldOfStudyId".alias("fos"), $"DisplayName", $"MainType")
+
+ val pfos = spark.read.load(s"$sourcePath/PaperFieldsOfStudy")
+
+ val paperField = pfos.joinWith(fos, fos("fos").equalTo(pfos("FieldOfStudyId")))
+ .select($"_1.FieldOfStudyId", $"_2.DisplayName", $"_2.MainType", $"_1.PaperId", $"_1.Score")
+ .groupBy($"PaperId").agg(collect_list(struct($"FieldOfStudyId", $"DisplayName", $"MainType", $"Score")).as("subjects"))
+ .as[MagFieldOfStudy]
+
+ magPubs.joinWith(paperField, col("_1").equalTo(paperField("PaperId")), "left").
+ map(item => {
+ val publication = item._1._2
+ val fieldOfStudy = item._2
+ if (fieldOfStudy != null && fieldOfStudy.subjects != null && fieldOfStudy.subjects.nonEmpty) {
+ val p: List[StructuredProperty] = fieldOfStudy.subjects.flatMap(s => {
+ val s1 = createSP(s.DisplayName, "keywords", "dnet:subject_classification_typologies")
+ val di = DoiBoostMappingUtil.generateDataInfo(s.Score.toString)
+ var resList: List[StructuredProperty] = List(s1)
+ if (s.MainType.isDefined) {
+ val maintp = s.MainType.get
+ val s2 = createSP(s.MainType.get, "keywords", "dnet:subject_classification_typologies")
+ s2.setDataInfo(di)
+ resList = resList ::: List(s2)
+ if (maintp.contains(".")) {
+ val s3 = createSP(maintp.split("\\.").head, "keywords", "dnet:subject_classification_typologies")
+ s3.setDataInfo(di)
+ resList = resList ::: List(s3)
+ }
+ }
+ resList
+ })
+ publication.setSubject(p.asJava)
}
- pub
- }
- ).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_4")
-
+ publication
+ }).map{s:Publication => s}(Encoders.bean(classOf[Publication])).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/mag_publication")
}
-
-
}
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml
index 2277b79b0f..6b4cad2af7 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml
@@ -22,7 +22,7 @@
-
+
diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala
index 2d7cf42164..0f9a3f8fb9 100644
--- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala
+++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/CrossrefMappingTest.scala
@@ -90,9 +90,9 @@ class CrossrefMappingTest {
val rels:List[Relation] = resultList.filter(p => p.isInstanceOf[Relation]).map(r=> r.asInstanceOf[Relation])
- assertEquals(rels.size, 4)
- rels.foreach(s => logger.info(s.getTarget))
+ rels.foreach(s => logger.info(s.getTarget))
+ assertEquals(rels.size, 3 )
}
diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala
index 4d26969dd9..1567e0008b 100644
--- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala
+++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala
@@ -24,47 +24,21 @@ class MAGMappingTest {
val mapper = new ObjectMapper()
+
+
@Test
- def testMAGCSV(): Unit = {
- // SparkPreProcessMAG.main("-m local[*] -s /data/doiboost/mag/datasets -t /data/doiboost/mag/datasets/preprocess".split(" "))
-
- val sparkConf: SparkConf = new SparkConf
-
- val spark: SparkSession = SparkSession.builder()
- .config(sparkConf)
- .appName(getClass.getSimpleName)
- .master("local[*]")
- .getOrCreate()
-
- import spark.implicits._
+ def testSplitter():Unit = {
+ val s = "sports.team"
- implicit val mapEncoderPubs: Encoder[Publication] = org.apache.spark.sql.Encoders.kryo[Publication]
- implicit val longBarEncoder = Encoders.tuple(Encoders.STRING, mapEncoderPubs)
-
- val sourcePath = "/data/doiboost/mag/input"
-
- mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
-
-
- val magOAF = spark.read.load("$sourcePath/merge_step_4").as[Publication]
-
- println(magOAF.first().getOriginalId)
-
-
- magOAF.map(k => (ConversionUtil.extractMagIdentifier(k.getOriginalId.asScala),k)).as[(String,Publication)].show()
-
-
- println((ConversionUtil.extractMagIdentifier(magOAF.first().getOriginalId.asScala)))
-
- val magIDRegex: Regex = "^[0-9]+$".r
-
-
- println(magIDRegex.findFirstMatchIn("suca").isDefined)
+ if (s.contains(".")) {
+ println(s.split("\\.")head)
+ }
}
+
@Test
def buildInvertedIndexTest(): Unit = {
val json_input = Source.fromInputStream(getClass.getResourceAsStream("invertedIndex.json")).mkString
diff --git a/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/awardTest.json b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/awardTest.json
index f570b3bdbc..7c6e7beb92 100644
--- a/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/awardTest.json
+++ b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/awardTest.json
@@ -179,12 +179,13 @@
],
"funder": [
{
+ "DOI": "10.13039/501100001711",
+ "name": "Swiss National Science Foundation (Schweizerische Nationalfonds)",
"doi-asserted-by": "publisher",
- "DOI": "10.13039/501100000781",
- "name": "European Research Council",
"award": [
- "284236-REPCOLLAB",
- "FP7/2007-2013"
+ "CR32I3_156724",
+ "31003A_173281/1",
+ "200021_165850"
]
}
],