diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index c4c8aeb61..4b5a8f2cc 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp
1.2.4-SNAPSHOT
- ../
+ ../pom.xml
dhp-common
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala
index d1ceb9a07..db28eaf7a 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala
@@ -1,5 +1,6 @@
package eu.dnetlib.doiboost.crossref
+import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf._
import eu.dnetlib.dhp.utils.DHPUtils
import eu.dnetlib.doiboost.DoiBoostMappingUtil._
@@ -13,12 +14,13 @@ import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.matching.Regex
+import eu.dnetlib.dhp.schema.scholexplorer.OafUtils
case class CrossrefDT(doi: String, json:String, timestamp: Long) {}
case class mappingAffiliation(name: String) {}
-case class mappingAuthor(given: Option[String], family: String, ORCID: Option[String], affiliation: Option[mappingAffiliation]) {}
+case class mappingAuthor(given: Option[String], family: String, sequence:Option[String], ORCID: Option[String], affiliation: Option[mappingAffiliation]) {}
case class mappingFunder(name: String, DOI: Option[String], award: Option[List[String]]) {}
@@ -154,7 +156,12 @@ case object Crossref2Oaf {
//Mapping Author
val authorList: List[mappingAuthor] = (json \ "author").extractOrElse[List[mappingAuthor]](List())
- result.setAuthor(authorList.map(a => generateAuhtor(a.given.orNull, a.family, a.ORCID.orNull)).asJava)
+
+
+
+ val sorted_list = authorList.sortWith((a:mappingAuthor, b:mappingAuthor) => a.sequence.isDefined && a.sequence.get.equalsIgnoreCase("first"))
+
+ result.setAuthor(sorted_list.zipWithIndex.map{case (a, index) => generateAuhtor(a.given.orNull, a.family, a.ORCID.orNull, index)}.asJava)
// Mapping instance
val instance = new Instance()
@@ -170,14 +177,14 @@ case object Crossref2Oaf {
if(has_review != JNothing) {
instance.setRefereed(
- createQualifier("0001", "peerReviewed", "dnet:review_levels", "dnet:review_levels"))
+ OafUtils.createQualifier("0001", "peerReviewed", ModelConstants.DNET_REVIEW_LEVELS, ModelConstants.DNET_REVIEW_LEVELS))
}
instance.setAccessright(getRestrictedQualifier())
result.setInstance(List(instance).asJava)
- instance.setInstancetype(createQualifier(cobjCategory.substring(0, 4), cobjCategory.substring(5), "dnet:publication_resource", "dnet:publication_resource"))
- result.setResourcetype(createQualifier(cobjCategory.substring(0, 4),"dnet:dataCite_resource"))
+ instance.setInstancetype(OafUtils.createQualifier(cobjCategory.substring(0, 4), cobjCategory.substring(5), ModelConstants.DNET_PUBLICATION_RESOURCE, ModelConstants.DNET_PUBLICATION_RESOURCE))
+ result.setResourcetype(OafUtils.createQualifier(cobjCategory.substring(0, 4),"dnet:dataCite_resource"))
instance.setCollectedfrom(createCrossrefCollectedFrom())
if (StringUtils.isNotBlank(issuedDate)) {
@@ -194,13 +201,14 @@ case object Crossref2Oaf {
}
- def generateAuhtor(given: String, family: String, orcid: String): Author = {
+ def generateAuhtor(given: String, family: String, orcid: String, index:Int): Author = {
val a = new Author
a.setName(given)
a.setSurname(family)
a.setFullname(s"$given $family")
+ a.setRank(index+1)
if (StringUtils.isNotBlank(orcid))
- a.setPid(List(createSP(orcid, ORCID_PENDING, PID_TYPES, generateDataInfo())).asJava)
+ a.setPid(List(createSP(orcid, ModelConstants.ORCID_PENDING, ModelConstants.DNET_PID_TYPES, generateDataInfo())).asJava)
a
}
@@ -221,7 +229,7 @@ case object Crossref2Oaf {
val result = generateItemFromType(objectType, objectSubType)
if (result == null)
return List()
- val cOBJCategory = mappingCrossrefSubType.getOrElse(objectType, mappingCrossrefSubType.getOrElse(objectSubType, "0038 Other literature type"));
+ val cOBJCategory = mappingCrossrefSubType.getOrElse(objectType, mappingCrossrefSubType.getOrElse(objectSubType, "0038 Other literature type"))
mappingResult(result, json, cOBJCategory)
@@ -299,77 +307,77 @@ case object Crossref2Oaf {
if (funders != null)
- funders.foreach(funder => {
- if (funder.DOI.isDefined && funder.DOI.get.nonEmpty) {
- funder.DOI.get match {
- case "10.13039/100010663" |
- "10.13039/100010661" |
- "10.13039/501100007601" |
- "10.13039/501100000780" |
- "10.13039/100010665" => generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward)
- case "10.13039/100011199" |
- "10.13039/100004431" |
- "10.13039/501100004963" |
- "10.13039/501100000780" => generateSimpleRelationFromAward(funder, "corda_______", extractECAward)
- case "10.13039/501100000781" => generateSimpleRelationFromAward(funder, "corda_______", extractECAward)
- generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward)
- case "10.13039/100000001" => generateSimpleRelationFromAward(funder, "nsf_________", a => a)
- case "10.13039/501100001665" => generateSimpleRelationFromAward(funder, "anr_________", a => a)
- case "10.13039/501100002341" => generateSimpleRelationFromAward(funder, "aka_________", a => a)
- case "10.13039/501100001602" => generateSimpleRelationFromAward(funder, "aka_________", a => a.replace("SFI", ""))
- case "10.13039/501100000923" => generateSimpleRelationFromAward(funder, "arc_________", a => a)
- case "10.13039/501100000038"=> val targetId = getProjectId("nserc_______" , "1e5e62235d094afd01cd56e65112fc63")
- queue += generateRelation(sourceId, targetId, "isProducedBy" )
- queue += generateRelation(targetId, sourceId, "produces" )
- case "10.13039/501100000155"=> val targetId = getProjectId("sshrc_______" , "1e5e62235d094afd01cd56e65112fc63")
- queue += generateRelation(sourceId,targetId, "isProducedBy" )
- queue += generateRelation(targetId,sourceId, "produces" )
- case "10.13039/501100000024"=> val targetId = getProjectId("cihr________" , "1e5e62235d094afd01cd56e65112fc63")
- queue += generateRelation(sourceId,targetId, "isProducedBy" )
- queue += generateRelation(targetId,sourceId, "produces" )
- case "10.13039/501100002848" => generateSimpleRelationFromAward(funder, "conicytf____", a => a)
- case "10.13039/501100003448" => generateSimpleRelationFromAward(funder, "gsrt________", extractECAward)
- case "10.13039/501100010198" => generateSimpleRelationFromAward(funder, "sgov________", a=>a)
- case "10.13039/501100004564" => generateSimpleRelationFromAward(funder, "mestd_______", extractECAward)
- case "10.13039/501100003407" => generateSimpleRelationFromAward(funder, "miur________", a=>a)
- val targetId = getProjectId("miur________" , "1e5e62235d094afd01cd56e65112fc63")
- queue += generateRelation(sourceId,targetId, "isProducedBy" )
- queue += generateRelation(targetId,sourceId, "produces" )
- case "10.13039/501100006588" |
- "10.13039/501100004488" => generateSimpleRelationFromAward(funder, "irb_hr______", a=>a.replaceAll("Project No.", "").replaceAll("HRZZ-","") )
- case "10.13039/501100006769"=> generateSimpleRelationFromAward(funder, "rsf_________", a=>a)
- case "10.13039/501100001711"=> generateSimpleRelationFromAward(funder, "snsf________", snsfRule)
- case "10.13039/501100004410"=> generateSimpleRelationFromAward(funder, "tubitakf____", a =>a)
- case "10.10.13039/100004440"=> generateSimpleRelationFromAward(funder, "wt__________", a =>a)
- case "10.13039/100004440"=> val targetId = getProjectId("wt__________" , "1e5e62235d094afd01cd56e65112fc63")
- queue += generateRelation(sourceId,targetId, "isProducedBy" )
- queue += generateRelation(targetId,sourceId, "produces" )
+ funders.foreach(funder => {
+ if (funder.DOI.isDefined && funder.DOI.get.nonEmpty) {
+ funder.DOI.get match {
+ case "10.13039/100010663" |
+ "10.13039/100010661" |
+ "10.13039/501100007601" |
+ "10.13039/501100000780" |
+ "10.13039/100010665" => generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward)
+ case "10.13039/100011199" |
+ "10.13039/100004431" |
+ "10.13039/501100004963" |
+ "10.13039/501100000780" => generateSimpleRelationFromAward(funder, "corda_______", extractECAward)
+ case "10.13039/501100000781" => generateSimpleRelationFromAward(funder, "corda_______", extractECAward)
+ generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward)
+ case "10.13039/100000001" => generateSimpleRelationFromAward(funder, "nsf_________", a => a)
+ case "10.13039/501100001665" => generateSimpleRelationFromAward(funder, "anr_________", a => a)
+ case "10.13039/501100002341" => generateSimpleRelationFromAward(funder, "aka_________", a => a)
+ case "10.13039/501100001602" => generateSimpleRelationFromAward(funder, "aka_________", a => a.replace("SFI", ""))
+ case "10.13039/501100000923" => generateSimpleRelationFromAward(funder, "arc_________", a => a)
+ case "10.13039/501100000038"=> val targetId = getProjectId("nserc_______" , "1e5e62235d094afd01cd56e65112fc63")
+ queue += generateRelation(sourceId, targetId, "isProducedBy" )
+ queue += generateRelation(targetId, sourceId, "produces" )
+ case "10.13039/501100000155"=> val targetId = getProjectId("sshrc_______" , "1e5e62235d094afd01cd56e65112fc63")
+ queue += generateRelation(sourceId,targetId, "isProducedBy" )
+ queue += generateRelation(targetId,sourceId, "produces" )
+ case "10.13039/501100000024"=> val targetId = getProjectId("cihr________" , "1e5e62235d094afd01cd56e65112fc63")
+ queue += generateRelation(sourceId,targetId, "isProducedBy" )
+ queue += generateRelation(targetId,sourceId, "produces" )
+ case "10.13039/501100002848" => generateSimpleRelationFromAward(funder, "conicytf____", a => a)
+ case "10.13039/501100003448" => generateSimpleRelationFromAward(funder, "gsrt________", extractECAward)
+ case "10.13039/501100010198" => generateSimpleRelationFromAward(funder, "sgov________", a=>a)
+ case "10.13039/501100004564" => generateSimpleRelationFromAward(funder, "mestd_______", extractECAward)
+ case "10.13039/501100003407" => generateSimpleRelationFromAward(funder, "miur________", a=>a)
+ val targetId = getProjectId("miur________" , "1e5e62235d094afd01cd56e65112fc63")
+ queue += generateRelation(sourceId,targetId, "isProducedBy" )
+ queue += generateRelation(targetId,sourceId, "produces" )
+ case "10.13039/501100006588" |
+ "10.13039/501100004488" => generateSimpleRelationFromAward(funder, "irb_hr______", a=>a.replaceAll("Project No.", "").replaceAll("HRZZ-","") )
+ case "10.13039/501100006769"=> generateSimpleRelationFromAward(funder, "rsf_________", a=>a)
+ case "10.13039/501100001711"=> generateSimpleRelationFromAward(funder, "snsf________", snsfRule)
+ case "10.13039/501100004410"=> generateSimpleRelationFromAward(funder, "tubitakf____", a =>a)
+ case "10.10.13039/100004440"=> generateSimpleRelationFromAward(funder, "wt__________", a =>a)
+ case "10.13039/100004440"=> val targetId = getProjectId("wt__________" , "1e5e62235d094afd01cd56e65112fc63")
+ queue += generateRelation(sourceId,targetId, "isProducedBy" )
+ queue += generateRelation(targetId,sourceId, "produces" )
- case _ => logger.debug("no match for "+funder.DOI.get )
+ case _ => logger.debug("no match for "+funder.DOI.get )
+ }
+
+
+ } else {
+ funder.name match {
+ case "European Union’s Horizon 2020 research and innovation program" => generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward)
+ case "European Union's" =>
+ generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward)
+ generateSimpleRelationFromAward(funder, "corda_______", extractECAward)
+ case "The French National Research Agency (ANR)" |
+ "The French National Research Agency" => generateSimpleRelationFromAward(funder, "anr_________", a => a)
+ case "CONICYT, Programa de Formación de Capital Humano Avanzado" => generateSimpleRelationFromAward(funder, "conicytf____", extractECAward)
+ case "Wellcome Trust Masters Fellowship" => val targetId = getProjectId("wt__________", "1e5e62235d094afd01cd56e65112fc63")
+ queue += generateRelation(sourceId, targetId, "isProducedBy" )
+ queue += generateRelation(targetId, sourceId, "produces" )
+ case _ => logger.debug("no match for "+funder.name )
+
+ }
}
-
- } else {
- funder.name match {
- case "European Union’s Horizon 2020 research and innovation program" => generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward)
- case "European Union's" =>
- generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward)
- generateSimpleRelationFromAward(funder, "corda_______", extractECAward)
- case "The French National Research Agency (ANR)" |
- "The French National Research Agency" => generateSimpleRelationFromAward(funder, "anr_________", a => a)
- case "CONICYT, Programa de Formación de Capital Humano Avanzado" => generateSimpleRelationFromAward(funder, "conicytf____", extractECAward)
- case "Wellcome Trust Masters Fellowship" => val targetId = getProjectId("wt__________", "1e5e62235d094afd01cd56e65112fc63")
- queue += generateRelation(sourceId, targetId, "isProducedBy" )
- queue += generateRelation(targetId, sourceId, "produces" )
- case _ => logger.debug("no match for "+funder.name )
-
- }
}
-
- }
- )
+ )
queue.toList
}
@@ -465,4 +473,4 @@ case object Crossref2Oaf {
null
}
-}
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala
index 4a39a2987..235305fb8 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala
@@ -96,4 +96,4 @@ object CrossrefDataset {
}
-}
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala
index 0272cb1a6..0036459bf 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala
@@ -52,4 +52,4 @@ object SparkMapDumpIntoOAF {
}
-}
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala
index 7bb4686cf..09b741b47 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala
@@ -1,6 +1,7 @@
package eu.dnetlib.doiboost.mag
+import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.{Instance, Journal, Publication, StructuredProperty}
import eu.dnetlib.doiboost.DoiBoostMappingUtil
import org.json4s
@@ -31,11 +32,11 @@ case class MagAffiliation(AffiliationId: Long, Rank: Int, NormalizedName: String
case class MagPaperAuthorAffiliation(PaperId: Long, AuthorId: Long, AffiliationId: Option[Long], AuthorSequenceNumber: Int, OriginalAuthor: String, OriginalAffiliation: String) {}
-case class MagAuthorAffiliation(author: MagAuthor, affiliation:String)
+case class MagAuthorAffiliation(author: MagAuthor, affiliation:String, sequenceNumber:Int)
case class MagPaperWithAuthorList(PaperId: Long, authors: List[MagAuthorAffiliation]) {}
-case class MagPaperAuthorDenormalized(PaperId: Long, author: MagAuthor, affiliation:String) {}
+case class MagPaperAuthorDenormalized(PaperId: Long, author: MagAuthor, affiliation:String, sequenceNumber:Int) {}
case class MagPaperUrl(PaperId: Long, SourceType: Option[Int], SourceUrl: Option[String], LanguageCode: Option[String]) {}
@@ -202,12 +203,12 @@ case object ConversionUtil {
val authorsOAF = authors.authors.map { f: MagAuthorAffiliation =>
val a: eu.dnetlib.dhp.schema.oaf.Author = new eu.dnetlib.dhp.schema.oaf.Author
-
- a.setFullname(f.author.DisplayName.get)
-
+ a.setRank(f.sequenceNumber)
+ if (f.author.DisplayName.isDefined)
+ a.setFullname(f.author.DisplayName.get)
if(f.affiliation!= null)
a.setAffiliation(List(asField(f.affiliation)).asJava)
- a.setPid(List(createSP(s"https://academic.microsoft.com/#/detail/${f.author.AuthorId}", "URL", PID_TYPES)).asJava)
+ a.setPid(List(createSP(s"https://academic.microsoft.com/#/detail/${f.author.AuthorId}", "URL", ModelConstants.DNET_PID_TYPES)).asJava)
a
}
pub.setAuthor(authorsOAF.asJava)
@@ -274,7 +275,7 @@ case object ConversionUtil {
a.setAffiliation(List(asField(f.affiliation)).asJava)
- a.setPid(List(createSP(s"https://academic.microsoft.com/#/detail/${f.author.AuthorId}", "URL", PID_TYPES)).asJava)
+ a.setPid(List(createSP(s"https://academic.microsoft.com/#/detail/${f.author.AuthorId}", "URL", ModelConstants.DNET_PID_TYPES)).asJava)
a
@@ -305,12 +306,12 @@ case object ConversionUtil {
for {(k: String, v: List[Int]) <- iid} {
v.foreach(item => res(item) = k)
}
- (0 until idl).foreach(i => {
- if (res(i) == null)
- res(i) = ""
- })
+ (0 until idl).foreach(i => {
+ if (res(i) == null)
+ res(i) = ""
+ })
return res.mkString(" ")
}
""
}
-}
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala
index 88fee72b7..de4e07655 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala
@@ -6,10 +6,10 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.types._
import org.slf4j.{Logger, LoggerFactory}
-import org.apache.spark.sql.functions._
object SparkImportMagIntoDataset {
val datatypedict = Map(
+ "bool" -> BooleanType,
"int" -> IntegerType,
"uint" -> IntegerType,
"long" -> LongType,
@@ -25,11 +25,10 @@ object SparkImportMagIntoDataset {
"AuthorExtendedAttributes" -> Tuple2("mag/AuthorExtendedAttributes.txt", Seq("AuthorId:long", "AttributeType:int", "AttributeValue:string")),
"Authors" -> Tuple2("mag/Authors.txt", Seq("AuthorId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "LastKnownAffiliationId:long?", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"ConferenceInstances" -> Tuple2("mag/ConferenceInstances.txt", Seq("ConferenceInstanceId:long", "NormalizedName:string", "DisplayName:string", "ConferenceSeriesId:long", "Location:string", "OfficialUrl:string", "StartDate:DateTime?", "EndDate:DateTime?", "AbstractRegistrationDate:DateTime?", "SubmissionDeadlineDate:DateTime?", "NotificationDueDate:DateTime?", "FinalVersionDueDate:DateTime?", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")),
- "ConferenceSeries" -> Tuple2("mag/ConferenceSeries.txt", Seq("ConferenceSeriesId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")),
+ "ConferenceSeries" -> Tuple2("mag/ConferenceSeries.txt", Seq("ConferenceSeriesId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"EntityRelatedEntities" -> Tuple2("advanced/EntityRelatedEntities.txt", Seq("EntityId:long", "EntityType:string", "RelatedEntityId:long", "RelatedEntityType:string", "RelatedType:int", "Score:float")),
"FieldOfStudyChildren" -> Tuple2("advanced/FieldOfStudyChildren.txt", Seq("FieldOfStudyId:long", "ChildFieldOfStudyId:long")),
"FieldOfStudyExtendedAttributes" -> Tuple2("advanced/FieldOfStudyExtendedAttributes.txt", Seq("FieldOfStudyId:long", "AttributeType:int", "AttributeValue:string")),
- // ['FieldOfStudyId:long', 'Rank:uint', 'NormalizedName:string', 'DisplayName:string', 'MainType:string', 'Level:int', 'PaperCount:long', 'PaperFamilyCount:long', 'CitationCount:long', 'CreatedDate:DateTime']
"FieldsOfStudy" -> Tuple2("advanced/FieldsOfStudy.txt", Seq("FieldOfStudyId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "MainType:string", "Level:int", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"Journals" -> Tuple2("mag/Journals.txt", Seq("JournalId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "Issn:string", "Publisher:string", "Webpage:string", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "CreatedDate:DateTime")),
"PaperAbstractsInvertedIndex" -> Tuple2("nlp/PaperAbstractsInvertedIndex.txt.*", Seq("PaperId:long", "IndexedAbstract:string")),
@@ -37,6 +36,7 @@ object SparkImportMagIntoDataset {
"PaperCitationContexts" -> Tuple2("nlp/PaperCitationContexts.txt", Seq("PaperId:long", "PaperReferenceId:long", "CitationContext:string")),
"PaperExtendedAttributes" -> Tuple2("mag/PaperExtendedAttributes.txt", Seq("PaperId:long", "AttributeType:int", "AttributeValue:string")),
"PaperFieldsOfStudy" -> Tuple2("advanced/PaperFieldsOfStudy.txt", Seq("PaperId:long", "FieldOfStudyId:long", "Score:float")),
+ "PaperMeSH" -> Tuple2("advanced/PaperMeSH.txt", Seq("PaperId:long", "DescriptorUI:string", "DescriptorName:string", "QualifierUI:string", "QualifierName:string", "IsMajorTopic:bool")),
"PaperRecommendations" -> Tuple2("advanced/PaperRecommendations.txt", Seq("PaperId:long", "RecommendedPaperId:long", "Score:float")),
"PaperReferences" -> Tuple2("mag/PaperReferences.txt", Seq("PaperId:long", "PaperReferenceId:long")),
"PaperResources" -> Tuple2("mag/PaperResources.txt", Seq("PaperId:long", "ResourceType:int", "ResourceUrl:string", "SourceUrl:string", "RelationshipType:int")),
@@ -91,4 +91,4 @@ object SparkImportMagIntoDataset {
}
-}
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala
index 780e65c1e..0dcef4176 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala
@@ -58,16 +58,16 @@ object SparkProcessMAG {
val paperAuthorAffiliation = spark.read.load(s"$sourcePath/PaperAuthorAffiliations").as[MagPaperAuthorAffiliation]
paperAuthorAffiliation.joinWith(authors, paperAuthorAffiliation("AuthorId").equalTo(authors("AuthorId")))
- .map { case (a: MagPaperAuthorAffiliation, b: MagAuthor) => (a.AffiliationId, MagPaperAuthorDenormalized(a.PaperId, b, null)) }
+ .map { case (a: MagPaperAuthorAffiliation, b: MagAuthor) => (a.AffiliationId, MagPaperAuthorDenormalized(a.PaperId, b, null, a.AuthorSequenceNumber)) }
.joinWith(affiliation, affiliation("AffiliationId").equalTo(col("_1")), "left")
.map(s => {
val mpa = s._1._2
val af = s._2
if (af != null) {
- MagPaperAuthorDenormalized(mpa.PaperId, mpa.author, af.DisplayName)
+ MagPaperAuthorDenormalized(mpa.PaperId, mpa.author, af.DisplayName, mpa.sequenceNumber)
} else
mpa
- }).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation")).as("authors"))
+ }).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation", $"sequenceNumber")).as("authors"))
.write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_1_paper_authors")
logger.info("Phase 4) create First Version of publication Entity with Paper Journal and Authors")
@@ -86,7 +86,7 @@ object SparkProcessMAG {
var magPubs: Dataset[(String, Publication)] =
spark.read.load(s"$workingPath/merge_step_2").as[Publication]
- .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)]
+ .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)]
val conference = spark.read.load(s"$sourcePath/ConferenceInstances")
@@ -115,10 +115,9 @@ object SparkProcessMAG {
.save(s"$workingPath/merge_step_3")
-// logger.info("Phase 6) Enrich Publication with description")
-// val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract]
-// pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract")
-
+ // logger.info("Phase 6) Enrich Publication with description")
+ // val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract]
+ // pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract")
val paperAbstract = spark.read.load((s"$workingPath/PaperAbstract")).as[MagPaperAbstract]
@@ -127,7 +126,7 @@ object SparkProcessMAG {
magPubs.joinWith(paperAbstract, col("_1").equalTo(paperAbstract("PaperId")), "left")
.map(item => ConversionUtil.updatePubsWithDescription(item)
- ).write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_4")
+ ).write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_4")
logger.info("Phase 7) Enrich Publication with FieldOfStudy")
@@ -153,9 +152,9 @@ object SparkProcessMAG {
val s:RDD[Publication] = spark.read.load(s"$workingPath/mag_publication").as[Publication]
.map(p=>Tuple2(p.getId, p)).rdd.reduceByKey((a:Publication, b:Publication) => ConversionUtil.mergePublication(a,b))
- .map(_._2)
+ .map(_._2)
spark.createDataset(s).as[Publication].write.mode(SaveMode.Overwrite).save(s"$targetPath/magPublication")
}
-}
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala
index e9773dbbb..e4b808085 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala
@@ -1,17 +1,25 @@
package eu.dnetlib.doiboost.orcid
import com.fasterxml.jackson.databind.ObjectMapper
+import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Publication}
-import eu.dnetlib.dhp.schema.orcid.OrcidDOI
+import eu.dnetlib.dhp.schema.orcid.{AuthorData, OrcidDOI}
import eu.dnetlib.doiboost.DoiBoostMappingUtil
-import eu.dnetlib.doiboost.DoiBoostMappingUtil.{ORCID, PID_TYPES, createSP, generateDataInfo, generateIdentifier}
+import eu.dnetlib.doiboost.DoiBoostMappingUtil.{createSP, generateDataInfo}
import org.apache.commons.lang.StringUtils
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
+import org.json4s
+import org.json4s.DefaultFormats
+import org.json4s.JsonAST._
+import org.json4s.jackson.JsonMethods._
-case class ORCIDItem(oid:String,name:String,surname:String,creditName:String,errorCode:String){}
+case class ORCIDItem(doi:String, authors:List[OrcidAuthor]){}
+case class OrcidAuthor(oid:String, name:Option[String], surname:Option[String], creditName:Option[String], otherNames:Option[List[String]], errorCode:Option[String]){}
+case class OrcidWork(oid:String, doi:String)
+
@@ -44,17 +52,65 @@ object ORCIDToOAF {
}
- def convertTOOAF(input:OrcidDOI) :Publication = {
- val doi = input.getDoi
+ def strValid(s:Option[String]) : Boolean = {
+ s.isDefined && s.get.nonEmpty
+ }
+
+ def authorValid(author:OrcidAuthor): Boolean ={
+ if (strValid(author.name) && strValid(author.surname)) {
+ return true
+ }
+ if (strValid(author.surname)) {
+ return true
+ }
+ if (strValid(author.creditName)) {
+ return true
+
+ }
+ false
+ }
+
+
+ def extractDOIWorks(input:String): List[OrcidWork] = {
+ implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
+ lazy val json: json4s.JValue = parse(input)
+
+ val oid = (json \ "workDetail" \"oid").extractOrElse[String](null)
+ if (oid == null)
+ return List()
+ val doi:List[(String, String)] = for {
+ JObject(extIds) <- json \ "workDetail" \"extIds"
+ JField("type", JString(typeValue)) <- extIds
+ JField("value", JString(value)) <- extIds
+ if "doi".equalsIgnoreCase(typeValue)
+ } yield (typeValue, value)
+ if (doi.nonEmpty) {
+ return doi.map(l =>OrcidWork(oid, l._2))
+ }
+ List()
+ }
+
+ def convertORCIDAuthor(input:String): OrcidAuthor = {
+ implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
+ lazy val json: json4s.JValue = parse(input)
+
+ (json \"authorData" ).extractOrElse[OrcidAuthor](null)
+ }
+
+
+ def convertTOOAF(input:ORCIDItem) :Publication = {
+ val doi = input.doi
val pub:Publication = new Publication
- pub.setPid(List(createSP(doi.toLowerCase, "doi", PID_TYPES)).asJava)
+ pub.setPid(List(createSP(doi.toLowerCase, "doi", ModelConstants.DNET_PID_TYPES)).asJava)
pub.setDataInfo(generateDataInfo())
- pub.setId(generateIdentifier(pub, doi.toLowerCase))
+
+ pub.setId(DoiBoostMappingUtil.generateIdentifier(pub, doi.toLowerCase))
+
try{
- val l:List[Author]= input.getAuthors.asScala.map(a=> {
- generateAuthor(a.getName, a.getSurname, a.getCreditName, a.getOid)
- })(collection.breakOut)
+ val l:List[Author]= input.authors.map(a=> {
+ generateAuthor(a)
+ })(collection.breakOut)
pub.setAuthor(l.asJava)
pub.setCollectedfrom(List(DoiBoostMappingUtil.createORIDCollectedFrom()).asJava)
@@ -74,19 +130,23 @@ object ORCIDToOAF {
di
}
- def generateAuthor(given: String, family: String, fullName:String, orcid: String): Author = {
+ def generateAuthor(o : OrcidAuthor): Author = {
val a = new Author
- a.setName(given)
- a.setSurname(family)
- if (fullName!= null && fullName.nonEmpty)
- a.setFullname(fullName)
- else
- a.setFullname(s"$given $family")
- if (StringUtils.isNotBlank(orcid))
- a.setPid(List(createSP(orcid, ORCID, PID_TYPES, generateOricPIDDatainfo())).asJava)
+ if (strValid(o.name)) {
+ a.setName(o.name.get.capitalize)
+ }
+ if (strValid(o.surname)) {
+ a.setSurname(o.surname.get.capitalize)
+ }
+ if(strValid(o.name) && strValid(o.surname))
+ a.setFullname(s"${o.name.get.capitalize} ${o.surname.get.capitalize}")
+ else if (strValid(o.creditName))
+ a.setFullname(o.creditName.get)
+ if (StringUtils.isNotBlank(o.oid))
+ a.setPid(List(createSP(o.oid, ModelConstants.ORCID, ModelConstants.DNET_PID_TYPES, generateOricPIDDatainfo())).asJava)
a
}
-}
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala
index f1c7c58b4..025d68b90 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala
@@ -1,72 +1,49 @@
package eu.dnetlib.doiboost.orcid
-import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import eu.dnetlib.dhp.application.ArgumentApplicationParser
-import eu.dnetlib.dhp.oa.merge.AuthorMerger
import eu.dnetlib.dhp.schema.oaf.Publication
-import eu.dnetlib.dhp.schema.orcid.OrcidDOI
-import eu.dnetlib.doiboost.mag.ConversionUtil
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.expressions.Aggregator
-import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
object SparkConvertORCIDToOAF {
val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass)
- def getPublicationAggregator(): Aggregator[(String, Publication), Publication, Publication] = new Aggregator[(String, Publication), Publication, Publication]{
+ def run(spark:SparkSession,sourcePath:String,workingPath:String, targetPath:String):Unit = {
+ import spark.implicits._
+ implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
- override def zero: Publication = new Publication()
+ val inputRDD:RDD[OrcidAuthor] = spark.sparkContext.textFile(s"$sourcePath/authors").map(s => ORCIDToOAF.convertORCIDAuthor(s)).filter(s => s!= null).filter(s => ORCIDToOAF.authorValid(s))
- override def reduce(b: Publication, a: (String, Publication)): Publication = {
- b.mergeFrom(a._2)
- b.setAuthor(AuthorMerger.mergeAuthor(a._2.getAuthor, b.getAuthor))
- if (b.getId == null)
- b.setId(a._2.getId)
- b
- }
+ spark.createDataset(inputRDD).as[OrcidAuthor].write.mode(SaveMode.Overwrite).save(s"$workingPath/author")
+ val res = spark.sparkContext.textFile(s"$sourcePath/works").flatMap(s => ORCIDToOAF.extractDOIWorks(s)).filter(s => s!= null)
- override def merge(wx: Publication, wy: Publication): Publication = {
- wx.mergeFrom(wy)
- wx.setAuthor(AuthorMerger.mergeAuthor(wy.getAuthor, wx.getAuthor))
- if(wx.getId == null && wy.getId.nonEmpty)
- wx.setId(wy.getId)
- wx
- }
- override def finish(reduction: Publication): Publication = reduction
+ spark.createDataset(res).as[OrcidWork].write.mode(SaveMode.Overwrite).save(s"$workingPath/works")
- override def bufferEncoder: Encoder[Publication] =
- Encoders.kryo(classOf[Publication])
+ val authors :Dataset[OrcidAuthor] = spark.read.load(s"$workingPath/author").as[OrcidAuthor]
- override def outputEncoder: Encoder[Publication] =
- Encoders.kryo(classOf[Publication])
+ val works :Dataset[OrcidWork] = spark.read.load(s"$workingPath/works").as[OrcidWork]
+
+ works.joinWith(authors, authors("oid").equalTo(works("oid")))
+ .map(i =>{
+ val doi = i._1.doi
+ val author = i._2
+ (doi, author)
+ }).groupBy(col("_1").alias("doi"))
+ .agg(collect_list(col("_2")).alias("authors"))
+ .write.mode(SaveMode.Overwrite).save(s"$workingPath/orcidworksWithAuthor")
+
+ val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem]
+
+ logger.info("Converting ORCID to OAF")
+ dataset.map(o => ORCIDToOAF.convertTOOAF(o)).write.mode(SaveMode.Overwrite).save(targetPath)
}
-def run(spark:SparkSession,sourcePath:String, targetPath:String):Unit = {
- implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
- implicit val mapOrcid: Encoder[OrcidDOI] = Encoders.kryo[OrcidDOI]
- implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs)
-
- val mapper = new ObjectMapper()
- mapper.getDeserializationConfig.withFeatures(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
-
- val dataset:Dataset[OrcidDOI] = spark.createDataset(spark.sparkContext.textFile(sourcePath).map(s => mapper.readValue(s,classOf[OrcidDOI])))
-
- logger.info("Converting ORCID to OAF")
- dataset.map(o => ORCIDToOAF.convertTOOAF(o)).filter(p=>p!=null)
- .map(d => (d.getId, d))
- .groupByKey(_._1)(Encoders.STRING)
- .agg(getPublicationAggregator().toColumn)
- .map(p => p._2)
- .write.mode(SaveMode.Overwrite).save(targetPath)
-}
-
def main(args: Array[String]): Unit = {
-
-
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json")))
parser.parseArgument(args)
@@ -78,11 +55,11 @@ def run(spark:SparkSession,sourcePath:String, targetPath:String):Unit = {
.master(parser.get("master")).getOrCreate()
-
val sourcePath = parser.get("sourcePath")
+ val workingPath = parser.get("workingPath")
val targetPath = parser.get("targetPath")
- run(spark, sourcePath, targetPath)
+ run(spark, sourcePath, workingPath, targetPath)
}
-}
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala
index a72e4b0d6..9ac6a0838 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala
@@ -40,4 +40,4 @@ object SparkMapUnpayWallToOAF {
d.write.mode(SaveMode.Overwrite).save(targetPath)
}
-}
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml
index 3f5805b62..77aa595f5 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml
@@ -54,6 +54,11 @@
+
+ MAGDumpPath
+ the MAG dump working path
+
+
inputPathMAG
the MAG working path
@@ -69,6 +74,11 @@
inputPathOrcid
+ the ORCID input path
+
+
+
+ workingPathOrcid
the ORCID working path
@@ -121,24 +131,27 @@
-
- yarn-cluster
- cluster
- GenerateCrossrefDataset
- eu.dnetlib.doiboost.crossref.CrossrefDataset
- dhp-doiboost-${projectVersion}.jar
-
- --executor-memory=${sparkExecutorMemory}
- --executor-cores=${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- --conf spark.sql.shuffle.partitions=3840
- ${sparkExtraOPT}
-
- --workingPath${inputPathCrossref}
- --masteryarn-cluster
-
-
-
+
+ yarn-cluster
+ cluster
+ GenerateCrossrefDataset
+ eu.dnetlib.doiboost.crossref.CrossrefDataset
+ dhp-doiboost-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.sql.shuffle.partitions=3840
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --workingPath${inputPathCrossref}
+ --masteryarn-cluster
+
+
+
@@ -147,6 +160,43 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ Convert Mag to Dataset
+ eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset
+ dhp-doiboost-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.sql.shuffle.partitions=3840
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --sourcePath${MAGDumpPath}
+ --targetPath${inputPathMAG}/dataset
+ --masteryarn-cluster
+
@@ -164,46 +214,15 @@
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
- ${sparkExtraOPT}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--sourcePath${inputPathCrossref}/crossref_ds
--targetPath${workingPath}
--masteryarn-cluster
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- yarn-cluster
- cluster
- Convert Mag to Dataset
- eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset
- dhp-doiboost-${projectVersion}.jar
-
- --executor-memory=${sparkExecutorMemory}
- --executor-cores=${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- ${sparkExtraOPT}
-
- --sourcePath${inputPathMAG}/input
- --targetPath${inputPathMAG}/dataset
- --masteryarn-cluster
-
@@ -216,11 +235,14 @@
eu.dnetlib.doiboost.mag.SparkProcessMAG
dhp-doiboost-${projectVersion}.jar
- --executor-memory=${sparkExecutorMemory}
+ --executor-memory=${sparkExecutorIntersectionMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
- ${sparkExtraOPT}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--sourcePath${inputPathMAG}/dataset
--workingPath${inputPathMAG}/process
@@ -245,10 +267,14 @@
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
- ${sparkExtraOPT}
+ --conf spark.sql.shuffle.partitions=3840
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--sourcePath${inputPathUnpayWall}/uw_extracted
- --targetPath${workingPath}
+ --targetPath${workingPath}/uwPublication
--masteryarn-cluster
@@ -268,10 +294,14 @@
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
- ${sparkExtraOPT}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--sourcePath${inputPathOrcid}
- --targetPath${workingPath}
+ --workingPath${workingPathOrcid}
+ --targetPath${workingPath}/orcidPublication
--masteryarn-cluster
@@ -291,11 +321,15 @@
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
- ${sparkExtraOPT}
+ --conf spark.sql.shuffle.partitions=3840
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--hostedByMapPath${hostedByMapPath}
- --affiliationPath${inputPathMAG}/process/Affiliations
- --paperAffiliationPath${inputPathMAG}/process/PaperAuthorAffiliations
+ --affiliationPath${inputPathMAG}/dataset/Affiliations
+ --paperAffiliationPath${inputPathMAG}/dataset/PaperAuthorAffiliations
--workingPath${workingPath}
--masteryarn-cluster
@@ -316,7 +350,10 @@
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
- ${sparkExtraOPT}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--dbPublicationPath${workingPath}/doiBoostPublicationFiltered
--dbDatasetPath${workingPath}/crossrefDataset