From 1adfc41d23f1f28928a5203c1514da45a4fd6078 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 5 May 2021 10:23:32 +0200 Subject: [PATCH] merged manually changes on stable_id for doiboost into master --- dhp-common/pom.xml | 2 +- .../doiboost/crossref/Crossref2Oaf.scala | 156 +++++++++-------- .../doiboost/crossref/CrossrefDataset.scala | 2 +- .../crossref/SparkMapDumpIntoOAF.scala | 2 +- .../dnetlib/doiboost/mag/MagDataModel.scala | 25 +-- .../mag/SparkImportMagIntoDataset.scala | 8 +- .../doiboost/mag/SparkProcessMAG.scala | 21 ++- .../dnetlib/doiboost/orcid/ORCIDToOAF.scala | 100 ++++++++--- .../orcid/SparkConvertORCIDToOAF.scala | 79 +++------ .../doiboost/uw/SparkMapUnpayWallToOAF.scala | 2 +- .../dhp/doiboost/oozie_app/workflow.xml | 163 +++++++++++------- 11 files changed, 321 insertions(+), 239 deletions(-) diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml index c4c8aeb61..4b5a8f2cc 100644 --- a/dhp-common/pom.xml +++ b/dhp-common/pom.xml @@ -6,7 +6,7 @@ eu.dnetlib.dhp dhp 1.2.4-SNAPSHOT - ../ + ../pom.xml dhp-common diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala index d1ceb9a07..db28eaf7a 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala @@ -1,5 +1,6 @@ package eu.dnetlib.doiboost.crossref +import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf._ import eu.dnetlib.dhp.utils.DHPUtils import eu.dnetlib.doiboost.DoiBoostMappingUtil._ @@ -13,12 +14,13 @@ import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.matching.Regex +import eu.dnetlib.dhp.schema.scholexplorer.OafUtils case class CrossrefDT(doi: String, json:String, timestamp: Long) {} case class mappingAffiliation(name: String) {} -case class mappingAuthor(given: Option[String], family: String, ORCID: Option[String], affiliation: Option[mappingAffiliation]) {} +case class mappingAuthor(given: Option[String], family: String, sequence:Option[String], ORCID: Option[String], affiliation: Option[mappingAffiliation]) {} case class mappingFunder(name: String, DOI: Option[String], award: Option[List[String]]) {} @@ -154,7 +156,12 @@ case object Crossref2Oaf { //Mapping Author val authorList: List[mappingAuthor] = (json \ "author").extractOrElse[List[mappingAuthor]](List()) - result.setAuthor(authorList.map(a => generateAuhtor(a.given.orNull, a.family, a.ORCID.orNull)).asJava) + + + + val sorted_list = authorList.sortWith((a:mappingAuthor, b:mappingAuthor) => a.sequence.isDefined && a.sequence.get.equalsIgnoreCase("first")) + + result.setAuthor(sorted_list.zipWithIndex.map{case (a, index) => generateAuhtor(a.given.orNull, a.family, a.ORCID.orNull, index)}.asJava) // Mapping instance val instance = new Instance() @@ -170,14 +177,14 @@ case object Crossref2Oaf { if(has_review != JNothing) { instance.setRefereed( - createQualifier("0001", "peerReviewed", "dnet:review_levels", "dnet:review_levels")) + OafUtils.createQualifier("0001", "peerReviewed", ModelConstants.DNET_REVIEW_LEVELS, ModelConstants.DNET_REVIEW_LEVELS)) } instance.setAccessright(getRestrictedQualifier()) result.setInstance(List(instance).asJava) - instance.setInstancetype(createQualifier(cobjCategory.substring(0, 4), cobjCategory.substring(5), "dnet:publication_resource", "dnet:publication_resource")) - result.setResourcetype(createQualifier(cobjCategory.substring(0, 4),"dnet:dataCite_resource")) + instance.setInstancetype(OafUtils.createQualifier(cobjCategory.substring(0, 4), cobjCategory.substring(5), ModelConstants.DNET_PUBLICATION_RESOURCE, ModelConstants.DNET_PUBLICATION_RESOURCE)) + result.setResourcetype(OafUtils.createQualifier(cobjCategory.substring(0, 4),"dnet:dataCite_resource")) instance.setCollectedfrom(createCrossrefCollectedFrom()) if (StringUtils.isNotBlank(issuedDate)) { @@ -194,13 +201,14 @@ case object Crossref2Oaf { } - def generateAuhtor(given: String, family: String, orcid: String): Author = { + def generateAuhtor(given: String, family: String, orcid: String, index:Int): Author = { val a = new Author a.setName(given) a.setSurname(family) a.setFullname(s"$given $family") + a.setRank(index+1) if (StringUtils.isNotBlank(orcid)) - a.setPid(List(createSP(orcid, ORCID_PENDING, PID_TYPES, generateDataInfo())).asJava) + a.setPid(List(createSP(orcid, ModelConstants.ORCID_PENDING, ModelConstants.DNET_PID_TYPES, generateDataInfo())).asJava) a } @@ -221,7 +229,7 @@ case object Crossref2Oaf { val result = generateItemFromType(objectType, objectSubType) if (result == null) return List() - val cOBJCategory = mappingCrossrefSubType.getOrElse(objectType, mappingCrossrefSubType.getOrElse(objectSubType, "0038 Other literature type")); + val cOBJCategory = mappingCrossrefSubType.getOrElse(objectType, mappingCrossrefSubType.getOrElse(objectSubType, "0038 Other literature type")) mappingResult(result, json, cOBJCategory) @@ -299,77 +307,77 @@ case object Crossref2Oaf { if (funders != null) - funders.foreach(funder => { - if (funder.DOI.isDefined && funder.DOI.get.nonEmpty) { - funder.DOI.get match { - case "10.13039/100010663" | - "10.13039/100010661" | - "10.13039/501100007601" | - "10.13039/501100000780" | - "10.13039/100010665" => generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward) - case "10.13039/100011199" | - "10.13039/100004431" | - "10.13039/501100004963" | - "10.13039/501100000780" => generateSimpleRelationFromAward(funder, "corda_______", extractECAward) - case "10.13039/501100000781" => generateSimpleRelationFromAward(funder, "corda_______", extractECAward) - generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward) - case "10.13039/100000001" => generateSimpleRelationFromAward(funder, "nsf_________", a => a) - case "10.13039/501100001665" => generateSimpleRelationFromAward(funder, "anr_________", a => a) - case "10.13039/501100002341" => generateSimpleRelationFromAward(funder, "aka_________", a => a) - case "10.13039/501100001602" => generateSimpleRelationFromAward(funder, "aka_________", a => a.replace("SFI", "")) - case "10.13039/501100000923" => generateSimpleRelationFromAward(funder, "arc_________", a => a) - case "10.13039/501100000038"=> val targetId = getProjectId("nserc_______" , "1e5e62235d094afd01cd56e65112fc63") - queue += generateRelation(sourceId, targetId, "isProducedBy" ) - queue += generateRelation(targetId, sourceId, "produces" ) - case "10.13039/501100000155"=> val targetId = getProjectId("sshrc_______" , "1e5e62235d094afd01cd56e65112fc63") - queue += generateRelation(sourceId,targetId, "isProducedBy" ) - queue += generateRelation(targetId,sourceId, "produces" ) - case "10.13039/501100000024"=> val targetId = getProjectId("cihr________" , "1e5e62235d094afd01cd56e65112fc63") - queue += generateRelation(sourceId,targetId, "isProducedBy" ) - queue += generateRelation(targetId,sourceId, "produces" ) - case "10.13039/501100002848" => generateSimpleRelationFromAward(funder, "conicytf____", a => a) - case "10.13039/501100003448" => generateSimpleRelationFromAward(funder, "gsrt________", extractECAward) - case "10.13039/501100010198" => generateSimpleRelationFromAward(funder, "sgov________", a=>a) - case "10.13039/501100004564" => generateSimpleRelationFromAward(funder, "mestd_______", extractECAward) - case "10.13039/501100003407" => generateSimpleRelationFromAward(funder, "miur________", a=>a) - val targetId = getProjectId("miur________" , "1e5e62235d094afd01cd56e65112fc63") - queue += generateRelation(sourceId,targetId, "isProducedBy" ) - queue += generateRelation(targetId,sourceId, "produces" ) - case "10.13039/501100006588" | - "10.13039/501100004488" => generateSimpleRelationFromAward(funder, "irb_hr______", a=>a.replaceAll("Project No.", "").replaceAll("HRZZ-","") ) - case "10.13039/501100006769"=> generateSimpleRelationFromAward(funder, "rsf_________", a=>a) - case "10.13039/501100001711"=> generateSimpleRelationFromAward(funder, "snsf________", snsfRule) - case "10.13039/501100004410"=> generateSimpleRelationFromAward(funder, "tubitakf____", a =>a) - case "10.10.13039/100004440"=> generateSimpleRelationFromAward(funder, "wt__________", a =>a) - case "10.13039/100004440"=> val targetId = getProjectId("wt__________" , "1e5e62235d094afd01cd56e65112fc63") - queue += generateRelation(sourceId,targetId, "isProducedBy" ) - queue += generateRelation(targetId,sourceId, "produces" ) + funders.foreach(funder => { + if (funder.DOI.isDefined && funder.DOI.get.nonEmpty) { + funder.DOI.get match { + case "10.13039/100010663" | + "10.13039/100010661" | + "10.13039/501100007601" | + "10.13039/501100000780" | + "10.13039/100010665" => generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward) + case "10.13039/100011199" | + "10.13039/100004431" | + "10.13039/501100004963" | + "10.13039/501100000780" => generateSimpleRelationFromAward(funder, "corda_______", extractECAward) + case "10.13039/501100000781" => generateSimpleRelationFromAward(funder, "corda_______", extractECAward) + generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward) + case "10.13039/100000001" => generateSimpleRelationFromAward(funder, "nsf_________", a => a) + case "10.13039/501100001665" => generateSimpleRelationFromAward(funder, "anr_________", a => a) + case "10.13039/501100002341" => generateSimpleRelationFromAward(funder, "aka_________", a => a) + case "10.13039/501100001602" => generateSimpleRelationFromAward(funder, "aka_________", a => a.replace("SFI", "")) + case "10.13039/501100000923" => generateSimpleRelationFromAward(funder, "arc_________", a => a) + case "10.13039/501100000038"=> val targetId = getProjectId("nserc_______" , "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId, targetId, "isProducedBy" ) + queue += generateRelation(targetId, sourceId, "produces" ) + case "10.13039/501100000155"=> val targetId = getProjectId("sshrc_______" , "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId,targetId, "isProducedBy" ) + queue += generateRelation(targetId,sourceId, "produces" ) + case "10.13039/501100000024"=> val targetId = getProjectId("cihr________" , "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId,targetId, "isProducedBy" ) + queue += generateRelation(targetId,sourceId, "produces" ) + case "10.13039/501100002848" => generateSimpleRelationFromAward(funder, "conicytf____", a => a) + case "10.13039/501100003448" => generateSimpleRelationFromAward(funder, "gsrt________", extractECAward) + case "10.13039/501100010198" => generateSimpleRelationFromAward(funder, "sgov________", a=>a) + case "10.13039/501100004564" => generateSimpleRelationFromAward(funder, "mestd_______", extractECAward) + case "10.13039/501100003407" => generateSimpleRelationFromAward(funder, "miur________", a=>a) + val targetId = getProjectId("miur________" , "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId,targetId, "isProducedBy" ) + queue += generateRelation(targetId,sourceId, "produces" ) + case "10.13039/501100006588" | + "10.13039/501100004488" => generateSimpleRelationFromAward(funder, "irb_hr______", a=>a.replaceAll("Project No.", "").replaceAll("HRZZ-","") ) + case "10.13039/501100006769"=> generateSimpleRelationFromAward(funder, "rsf_________", a=>a) + case "10.13039/501100001711"=> generateSimpleRelationFromAward(funder, "snsf________", snsfRule) + case "10.13039/501100004410"=> generateSimpleRelationFromAward(funder, "tubitakf____", a =>a) + case "10.10.13039/100004440"=> generateSimpleRelationFromAward(funder, "wt__________", a =>a) + case "10.13039/100004440"=> val targetId = getProjectId("wt__________" , "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId,targetId, "isProducedBy" ) + queue += generateRelation(targetId,sourceId, "produces" ) - case _ => logger.debug("no match for "+funder.DOI.get ) + case _ => logger.debug("no match for "+funder.DOI.get ) + } + + + } else { + funder.name match { + case "European Union’s Horizon 2020 research and innovation program" => generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward) + case "European Union's" => + generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward) + generateSimpleRelationFromAward(funder, "corda_______", extractECAward) + case "The French National Research Agency (ANR)" | + "The French National Research Agency" => generateSimpleRelationFromAward(funder, "anr_________", a => a) + case "CONICYT, Programa de Formación de Capital Humano Avanzado" => generateSimpleRelationFromAward(funder, "conicytf____", extractECAward) + case "Wellcome Trust Masters Fellowship" => val targetId = getProjectId("wt__________", "1e5e62235d094afd01cd56e65112fc63") + queue += generateRelation(sourceId, targetId, "isProducedBy" ) + queue += generateRelation(targetId, sourceId, "produces" ) + case _ => logger.debug("no match for "+funder.name ) + + } } - - } else { - funder.name match { - case "European Union’s Horizon 2020 research and innovation program" => generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward) - case "European Union's" => - generateSimpleRelationFromAward(funder, "corda__h2020", extractECAward) - generateSimpleRelationFromAward(funder, "corda_______", extractECAward) - case "The French National Research Agency (ANR)" | - "The French National Research Agency" => generateSimpleRelationFromAward(funder, "anr_________", a => a) - case "CONICYT, Programa de Formación de Capital Humano Avanzado" => generateSimpleRelationFromAward(funder, "conicytf____", extractECAward) - case "Wellcome Trust Masters Fellowship" => val targetId = getProjectId("wt__________", "1e5e62235d094afd01cd56e65112fc63") - queue += generateRelation(sourceId, targetId, "isProducedBy" ) - queue += generateRelation(targetId, sourceId, "produces" ) - case _ => logger.debug("no match for "+funder.name ) - - } } - - } - ) + ) queue.toList } @@ -465,4 +473,4 @@ case object Crossref2Oaf { null } -} +} \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala index 4a39a2987..235305fb8 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala @@ -96,4 +96,4 @@ object CrossrefDataset { } -} +} \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala index 0272cb1a6..0036459bf 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala @@ -52,4 +52,4 @@ object SparkMapDumpIntoOAF { } -} +} \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala index 7bb4686cf..09b741b47 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala @@ -1,6 +1,7 @@ package eu.dnetlib.doiboost.mag +import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.{Instance, Journal, Publication, StructuredProperty} import eu.dnetlib.doiboost.DoiBoostMappingUtil import org.json4s @@ -31,11 +32,11 @@ case class MagAffiliation(AffiliationId: Long, Rank: Int, NormalizedName: String case class MagPaperAuthorAffiliation(PaperId: Long, AuthorId: Long, AffiliationId: Option[Long], AuthorSequenceNumber: Int, OriginalAuthor: String, OriginalAffiliation: String) {} -case class MagAuthorAffiliation(author: MagAuthor, affiliation:String) +case class MagAuthorAffiliation(author: MagAuthor, affiliation:String, sequenceNumber:Int) case class MagPaperWithAuthorList(PaperId: Long, authors: List[MagAuthorAffiliation]) {} -case class MagPaperAuthorDenormalized(PaperId: Long, author: MagAuthor, affiliation:String) {} +case class MagPaperAuthorDenormalized(PaperId: Long, author: MagAuthor, affiliation:String, sequenceNumber:Int) {} case class MagPaperUrl(PaperId: Long, SourceType: Option[Int], SourceUrl: Option[String], LanguageCode: Option[String]) {} @@ -202,12 +203,12 @@ case object ConversionUtil { val authorsOAF = authors.authors.map { f: MagAuthorAffiliation => val a: eu.dnetlib.dhp.schema.oaf.Author = new eu.dnetlib.dhp.schema.oaf.Author - - a.setFullname(f.author.DisplayName.get) - + a.setRank(f.sequenceNumber) + if (f.author.DisplayName.isDefined) + a.setFullname(f.author.DisplayName.get) if(f.affiliation!= null) a.setAffiliation(List(asField(f.affiliation)).asJava) - a.setPid(List(createSP(s"https://academic.microsoft.com/#/detail/${f.author.AuthorId}", "URL", PID_TYPES)).asJava) + a.setPid(List(createSP(s"https://academic.microsoft.com/#/detail/${f.author.AuthorId}", "URL", ModelConstants.DNET_PID_TYPES)).asJava) a } pub.setAuthor(authorsOAF.asJava) @@ -274,7 +275,7 @@ case object ConversionUtil { a.setAffiliation(List(asField(f.affiliation)).asJava) - a.setPid(List(createSP(s"https://academic.microsoft.com/#/detail/${f.author.AuthorId}", "URL", PID_TYPES)).asJava) + a.setPid(List(createSP(s"https://academic.microsoft.com/#/detail/${f.author.AuthorId}", "URL", ModelConstants.DNET_PID_TYPES)).asJava) a @@ -305,12 +306,12 @@ case object ConversionUtil { for {(k: String, v: List[Int]) <- iid} { v.foreach(item => res(item) = k) } - (0 until idl).foreach(i => { - if (res(i) == null) - res(i) = "" - }) + (0 until idl).foreach(i => { + if (res(i) == null) + res(i) = "" + }) return res.mkString(" ") } "" } -} +} \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala index 88fee72b7..de4e07655 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala @@ -6,10 +6,10 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.types._ import org.slf4j.{Logger, LoggerFactory} -import org.apache.spark.sql.functions._ object SparkImportMagIntoDataset { val datatypedict = Map( + "bool" -> BooleanType, "int" -> IntegerType, "uint" -> IntegerType, "long" -> LongType, @@ -25,11 +25,10 @@ object SparkImportMagIntoDataset { "AuthorExtendedAttributes" -> Tuple2("mag/AuthorExtendedAttributes.txt", Seq("AuthorId:long", "AttributeType:int", "AttributeValue:string")), "Authors" -> Tuple2("mag/Authors.txt", Seq("AuthorId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "LastKnownAffiliationId:long?", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")), "ConferenceInstances" -> Tuple2("mag/ConferenceInstances.txt", Seq("ConferenceInstanceId:long", "NormalizedName:string", "DisplayName:string", "ConferenceSeriesId:long", "Location:string", "OfficialUrl:string", "StartDate:DateTime?", "EndDate:DateTime?", "AbstractRegistrationDate:DateTime?", "SubmissionDeadlineDate:DateTime?", "NotificationDueDate:DateTime?", "FinalVersionDueDate:DateTime?", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")), - "ConferenceSeries" -> Tuple2("mag/ConferenceSeries.txt", Seq("ConferenceSeriesId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")), + "ConferenceSeries" -> Tuple2("mag/ConferenceSeries.txt", Seq("ConferenceSeriesId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")), "EntityRelatedEntities" -> Tuple2("advanced/EntityRelatedEntities.txt", Seq("EntityId:long", "EntityType:string", "RelatedEntityId:long", "RelatedEntityType:string", "RelatedType:int", "Score:float")), "FieldOfStudyChildren" -> Tuple2("advanced/FieldOfStudyChildren.txt", Seq("FieldOfStudyId:long", "ChildFieldOfStudyId:long")), "FieldOfStudyExtendedAttributes" -> Tuple2("advanced/FieldOfStudyExtendedAttributes.txt", Seq("FieldOfStudyId:long", "AttributeType:int", "AttributeValue:string")), - // ['FieldOfStudyId:long', 'Rank:uint', 'NormalizedName:string', 'DisplayName:string', 'MainType:string', 'Level:int', 'PaperCount:long', 'PaperFamilyCount:long', 'CitationCount:long', 'CreatedDate:DateTime'] "FieldsOfStudy" -> Tuple2("advanced/FieldsOfStudy.txt", Seq("FieldOfStudyId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "MainType:string", "Level:int", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")), "Journals" -> Tuple2("mag/Journals.txt", Seq("JournalId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "Issn:string", "Publisher:string", "Webpage:string", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "CreatedDate:DateTime")), "PaperAbstractsInvertedIndex" -> Tuple2("nlp/PaperAbstractsInvertedIndex.txt.*", Seq("PaperId:long", "IndexedAbstract:string")), @@ -37,6 +36,7 @@ object SparkImportMagIntoDataset { "PaperCitationContexts" -> Tuple2("nlp/PaperCitationContexts.txt", Seq("PaperId:long", "PaperReferenceId:long", "CitationContext:string")), "PaperExtendedAttributes" -> Tuple2("mag/PaperExtendedAttributes.txt", Seq("PaperId:long", "AttributeType:int", "AttributeValue:string")), "PaperFieldsOfStudy" -> Tuple2("advanced/PaperFieldsOfStudy.txt", Seq("PaperId:long", "FieldOfStudyId:long", "Score:float")), + "PaperMeSH" -> Tuple2("advanced/PaperMeSH.txt", Seq("PaperId:long", "DescriptorUI:string", "DescriptorName:string", "QualifierUI:string", "QualifierName:string", "IsMajorTopic:bool")), "PaperRecommendations" -> Tuple2("advanced/PaperRecommendations.txt", Seq("PaperId:long", "RecommendedPaperId:long", "Score:float")), "PaperReferences" -> Tuple2("mag/PaperReferences.txt", Seq("PaperId:long", "PaperReferenceId:long")), "PaperResources" -> Tuple2("mag/PaperResources.txt", Seq("PaperId:long", "ResourceType:int", "ResourceUrl:string", "SourceUrl:string", "RelationshipType:int")), @@ -91,4 +91,4 @@ object SparkImportMagIntoDataset { } -} +} \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala index 780e65c1e..0dcef4176 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala @@ -58,16 +58,16 @@ object SparkProcessMAG { val paperAuthorAffiliation = spark.read.load(s"$sourcePath/PaperAuthorAffiliations").as[MagPaperAuthorAffiliation] paperAuthorAffiliation.joinWith(authors, paperAuthorAffiliation("AuthorId").equalTo(authors("AuthorId"))) - .map { case (a: MagPaperAuthorAffiliation, b: MagAuthor) => (a.AffiliationId, MagPaperAuthorDenormalized(a.PaperId, b, null)) } + .map { case (a: MagPaperAuthorAffiliation, b: MagAuthor) => (a.AffiliationId, MagPaperAuthorDenormalized(a.PaperId, b, null, a.AuthorSequenceNumber)) } .joinWith(affiliation, affiliation("AffiliationId").equalTo(col("_1")), "left") .map(s => { val mpa = s._1._2 val af = s._2 if (af != null) { - MagPaperAuthorDenormalized(mpa.PaperId, mpa.author, af.DisplayName) + MagPaperAuthorDenormalized(mpa.PaperId, mpa.author, af.DisplayName, mpa.sequenceNumber) } else mpa - }).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation")).as("authors")) + }).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation", $"sequenceNumber")).as("authors")) .write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_1_paper_authors") logger.info("Phase 4) create First Version of publication Entity with Paper Journal and Authors") @@ -86,7 +86,7 @@ object SparkProcessMAG { var magPubs: Dataset[(String, Publication)] = spark.read.load(s"$workingPath/merge_step_2").as[Publication] - .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] + .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] val conference = spark.read.load(s"$sourcePath/ConferenceInstances") @@ -115,10 +115,9 @@ object SparkProcessMAG { .save(s"$workingPath/merge_step_3") -// logger.info("Phase 6) Enrich Publication with description") -// val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract] -// pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract") - + // logger.info("Phase 6) Enrich Publication with description") + // val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract] + // pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract") val paperAbstract = spark.read.load((s"$workingPath/PaperAbstract")).as[MagPaperAbstract] @@ -127,7 +126,7 @@ object SparkProcessMAG { magPubs.joinWith(paperAbstract, col("_1").equalTo(paperAbstract("PaperId")), "left") .map(item => ConversionUtil.updatePubsWithDescription(item) - ).write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_4") + ).write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_4") logger.info("Phase 7) Enrich Publication with FieldOfStudy") @@ -153,9 +152,9 @@ object SparkProcessMAG { val s:RDD[Publication] = spark.read.load(s"$workingPath/mag_publication").as[Publication] .map(p=>Tuple2(p.getId, p)).rdd.reduceByKey((a:Publication, b:Publication) => ConversionUtil.mergePublication(a,b)) - .map(_._2) + .map(_._2) spark.createDataset(s).as[Publication].write.mode(SaveMode.Overwrite).save(s"$targetPath/magPublication") } -} +} \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala index e9773dbbb..e4b808085 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala @@ -1,17 +1,25 @@ package eu.dnetlib.doiboost.orcid import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Publication} -import eu.dnetlib.dhp.schema.orcid.OrcidDOI +import eu.dnetlib.dhp.schema.orcid.{AuthorData, OrcidDOI} import eu.dnetlib.doiboost.DoiBoostMappingUtil -import eu.dnetlib.doiboost.DoiBoostMappingUtil.{ORCID, PID_TYPES, createSP, generateDataInfo, generateIdentifier} +import eu.dnetlib.doiboost.DoiBoostMappingUtil.{createSP, generateDataInfo} import org.apache.commons.lang.StringUtils import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.JsonAST._ +import org.json4s.jackson.JsonMethods._ -case class ORCIDItem(oid:String,name:String,surname:String,creditName:String,errorCode:String){} +case class ORCIDItem(doi:String, authors:List[OrcidAuthor]){} +case class OrcidAuthor(oid:String, name:Option[String], surname:Option[String], creditName:Option[String], otherNames:Option[List[String]], errorCode:Option[String]){} +case class OrcidWork(oid:String, doi:String) + @@ -44,17 +52,65 @@ object ORCIDToOAF { } - def convertTOOAF(input:OrcidDOI) :Publication = { - val doi = input.getDoi + def strValid(s:Option[String]) : Boolean = { + s.isDefined && s.get.nonEmpty + } + + def authorValid(author:OrcidAuthor): Boolean ={ + if (strValid(author.name) && strValid(author.surname)) { + return true + } + if (strValid(author.surname)) { + return true + } + if (strValid(author.creditName)) { + return true + + } + false + } + + + def extractDOIWorks(input:String): List[OrcidWork] = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + + val oid = (json \ "workDetail" \"oid").extractOrElse[String](null) + if (oid == null) + return List() + val doi:List[(String, String)] = for { + JObject(extIds) <- json \ "workDetail" \"extIds" + JField("type", JString(typeValue)) <- extIds + JField("value", JString(value)) <- extIds + if "doi".equalsIgnoreCase(typeValue) + } yield (typeValue, value) + if (doi.nonEmpty) { + return doi.map(l =>OrcidWork(oid, l._2)) + } + List() + } + + def convertORCIDAuthor(input:String): OrcidAuthor = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + + (json \"authorData" ).extractOrElse[OrcidAuthor](null) + } + + + def convertTOOAF(input:ORCIDItem) :Publication = { + val doi = input.doi val pub:Publication = new Publication - pub.setPid(List(createSP(doi.toLowerCase, "doi", PID_TYPES)).asJava) + pub.setPid(List(createSP(doi.toLowerCase, "doi", ModelConstants.DNET_PID_TYPES)).asJava) pub.setDataInfo(generateDataInfo()) - pub.setId(generateIdentifier(pub, doi.toLowerCase)) + + pub.setId(DoiBoostMappingUtil.generateIdentifier(pub, doi.toLowerCase)) + try{ - val l:List[Author]= input.getAuthors.asScala.map(a=> { - generateAuthor(a.getName, a.getSurname, a.getCreditName, a.getOid) - })(collection.breakOut) + val l:List[Author]= input.authors.map(a=> { + generateAuthor(a) + })(collection.breakOut) pub.setAuthor(l.asJava) pub.setCollectedfrom(List(DoiBoostMappingUtil.createORIDCollectedFrom()).asJava) @@ -74,19 +130,23 @@ object ORCIDToOAF { di } - def generateAuthor(given: String, family: String, fullName:String, orcid: String): Author = { + def generateAuthor(o : OrcidAuthor): Author = { val a = new Author - a.setName(given) - a.setSurname(family) - if (fullName!= null && fullName.nonEmpty) - a.setFullname(fullName) - else - a.setFullname(s"$given $family") - if (StringUtils.isNotBlank(orcid)) - a.setPid(List(createSP(orcid, ORCID, PID_TYPES, generateOricPIDDatainfo())).asJava) + if (strValid(o.name)) { + a.setName(o.name.get.capitalize) + } + if (strValid(o.surname)) { + a.setSurname(o.surname.get.capitalize) + } + if(strValid(o.name) && strValid(o.surname)) + a.setFullname(s"${o.name.get.capitalize} ${o.surname.get.capitalize}") + else if (strValid(o.creditName)) + a.setFullname(o.creditName.get) + if (StringUtils.isNotBlank(o.oid)) + a.setPid(List(createSP(o.oid, ModelConstants.ORCID, ModelConstants.DNET_PID_TYPES, generateOricPIDDatainfo())).asJava) a } -} +} \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala index f1c7c58b4..025d68b90 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala @@ -1,72 +1,49 @@ package eu.dnetlib.doiboost.orcid -import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.oa.merge.AuthorMerger import eu.dnetlib.dhp.schema.oaf.Publication -import eu.dnetlib.dhp.schema.orcid.OrcidDOI -import eu.dnetlib.doiboost.mag.ConversionUtil import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD -import org.apache.spark.sql.expressions.Aggregator -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql._ import org.slf4j.{Logger, LoggerFactory} object SparkConvertORCIDToOAF { val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass) - def getPublicationAggregator(): Aggregator[(String, Publication), Publication, Publication] = new Aggregator[(String, Publication), Publication, Publication]{ + def run(spark:SparkSession,sourcePath:String,workingPath:String, targetPath:String):Unit = { + import spark.implicits._ + implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] - override def zero: Publication = new Publication() + val inputRDD:RDD[OrcidAuthor] = spark.sparkContext.textFile(s"$sourcePath/authors").map(s => ORCIDToOAF.convertORCIDAuthor(s)).filter(s => s!= null).filter(s => ORCIDToOAF.authorValid(s)) - override def reduce(b: Publication, a: (String, Publication)): Publication = { - b.mergeFrom(a._2) - b.setAuthor(AuthorMerger.mergeAuthor(a._2.getAuthor, b.getAuthor)) - if (b.getId == null) - b.setId(a._2.getId) - b - } + spark.createDataset(inputRDD).as[OrcidAuthor].write.mode(SaveMode.Overwrite).save(s"$workingPath/author") + val res = spark.sparkContext.textFile(s"$sourcePath/works").flatMap(s => ORCIDToOAF.extractDOIWorks(s)).filter(s => s!= null) - override def merge(wx: Publication, wy: Publication): Publication = { - wx.mergeFrom(wy) - wx.setAuthor(AuthorMerger.mergeAuthor(wy.getAuthor, wx.getAuthor)) - if(wx.getId == null && wy.getId.nonEmpty) - wx.setId(wy.getId) - wx - } - override def finish(reduction: Publication): Publication = reduction + spark.createDataset(res).as[OrcidWork].write.mode(SaveMode.Overwrite).save(s"$workingPath/works") - override def bufferEncoder: Encoder[Publication] = - Encoders.kryo(classOf[Publication]) + val authors :Dataset[OrcidAuthor] = spark.read.load(s"$workingPath/author").as[OrcidAuthor] - override def outputEncoder: Encoder[Publication] = - Encoders.kryo(classOf[Publication]) + val works :Dataset[OrcidWork] = spark.read.load(s"$workingPath/works").as[OrcidWork] + + works.joinWith(authors, authors("oid").equalTo(works("oid"))) + .map(i =>{ + val doi = i._1.doi + val author = i._2 + (doi, author) + }).groupBy(col("_1").alias("doi")) + .agg(collect_list(col("_2")).alias("authors")) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/orcidworksWithAuthor") + + val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem] + + logger.info("Converting ORCID to OAF") + dataset.map(o => ORCIDToOAF.convertTOOAF(o)).write.mode(SaveMode.Overwrite).save(targetPath) } -def run(spark:SparkSession,sourcePath:String, targetPath:String):Unit = { - implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] - implicit val mapOrcid: Encoder[OrcidDOI] = Encoders.kryo[OrcidDOI] - implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs) - - val mapper = new ObjectMapper() - mapper.getDeserializationConfig.withFeatures(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) - - val dataset:Dataset[OrcidDOI] = spark.createDataset(spark.sparkContext.textFile(sourcePath).map(s => mapper.readValue(s,classOf[OrcidDOI]))) - - logger.info("Converting ORCID to OAF") - dataset.map(o => ORCIDToOAF.convertTOOAF(o)).filter(p=>p!=null) - .map(d => (d.getId, d)) - .groupByKey(_._1)(Encoders.STRING) - .agg(getPublicationAggregator().toColumn) - .map(p => p._2) - .write.mode(SaveMode.Overwrite).save(targetPath) -} - def main(args: Array[String]): Unit = { - - val conf: SparkConf = new SparkConf() val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json"))) parser.parseArgument(args) @@ -78,11 +55,11 @@ def run(spark:SparkSession,sourcePath:String, targetPath:String):Unit = { .master(parser.get("master")).getOrCreate() - val sourcePath = parser.get("sourcePath") + val workingPath = parser.get("workingPath") val targetPath = parser.get("targetPath") - run(spark, sourcePath, targetPath) + run(spark, sourcePath, workingPath, targetPath) } -} +} \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala index a72e4b0d6..9ac6a0838 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala @@ -40,4 +40,4 @@ object SparkMapUnpayWallToOAF { d.write.mode(SaveMode.Overwrite).save(targetPath) } -} +} \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml index 3f5805b62..77aa595f5 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml @@ -54,6 +54,11 @@ + + MAGDumpPath + the MAG dump working path + + inputPathMAG the MAG working path @@ -69,6 +74,11 @@ inputPathOrcid + the ORCID input path + + + + workingPathOrcid the ORCID working path @@ -121,24 +131,27 @@ - - yarn-cluster - cluster - GenerateCrossrefDataset - eu.dnetlib.doiboost.crossref.CrossrefDataset - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} - - --workingPath${inputPathCrossref} - --masteryarn-cluster - - - + + yarn-cluster + cluster + GenerateCrossrefDataset + eu.dnetlib.doiboost.crossref.CrossrefDataset + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --workingPath${inputPathCrossref} + --masteryarn-cluster + + + @@ -147,6 +160,43 @@ + + + + + + + + + + + + + + + + + + + yarn-cluster + cluster + Convert Mag to Dataset + eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePath${MAGDumpPath} + --targetPath${inputPathMAG}/dataset + --masteryarn-cluster + @@ -164,46 +214,15 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --sourcePath${inputPathCrossref}/crossref_ds --targetPath${workingPath} --masteryarn-cluster - - - - - - - - - - - - - - - - - - - - yarn-cluster - cluster - Convert Mag to Dataset - eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset - dhp-doiboost-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - ${sparkExtraOPT} - - --sourcePath${inputPathMAG}/input - --targetPath${inputPathMAG}/dataset - --masteryarn-cluster - @@ -216,11 +235,14 @@ eu.dnetlib.doiboost.mag.SparkProcessMAG dhp-doiboost-${projectVersion}.jar - --executor-memory=${sparkExecutorMemory} + --executor-memory=${sparkExecutorIntersectionMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --sourcePath${inputPathMAG}/dataset --workingPath${inputPathMAG}/process @@ -245,10 +267,14 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --sourcePath${inputPathUnpayWall}/uw_extracted - --targetPath${workingPath} + --targetPath${workingPath}/uwPublication --masteryarn-cluster @@ -268,10 +294,14 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --sourcePath${inputPathOrcid} - --targetPath${workingPath} + --workingPath${workingPathOrcid} + --targetPath${workingPath}/orcidPublication --masteryarn-cluster @@ -291,11 +321,15 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --hostedByMapPath${hostedByMapPath} - --affiliationPath${inputPathMAG}/process/Affiliations - --paperAffiliationPath${inputPathMAG}/process/PaperAuthorAffiliations + --affiliationPath${inputPathMAG}/dataset/Affiliations + --paperAffiliationPath${inputPathMAG}/dataset/PaperAuthorAffiliations --workingPath${workingPath} --masteryarn-cluster @@ -316,7 +350,10 @@ --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.sql.shuffle.partitions=3840 - ${sparkExtraOPT} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --dbPublicationPath${workingPath}/doiBoostPublicationFiltered --dbDatasetPath${workingPath}/crossrefDataset