From 302baab67b3de2e8f186a001ebc69e712058649a Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 7 Dec 2020 19:59:33 +0100 Subject: [PATCH 1/4] fixed doiboost mapping and workflows --- .../eu/dnetlib/dhp/schema/orcid/OrcidDOI.java | 29 ++++---- .../doiboost/SparkGenerateDoiBoost.scala | 2 +- .../mag/SparkImportMagIntoDataset.scala | 14 ++-- .../doiboost/mag/SparkPreProcessMAG.scala | 42 ++++++------ .../dnetlib/doiboost/orcid/ORCIDToOAF.scala | 27 +++++--- .../orcid/SparkConvertORCIDToOAF.scala | 66 +++++++++++++++---- .../intersection/oozie_app/workflow.xml | 9 +-- .../dhp/doiboost/mag/oozie_app/workflow.xml | 19 ++++-- .../doiboost/mag/preprocess_mag_params.json | 3 +- .../orcid/MappingORCIDToOAFTest.scala | 27 ++++++++ 10 files changed, 163 insertions(+), 75 deletions(-) diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/OrcidDOI.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/OrcidDOI.java index 11bce26c8..cf372c12a 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/OrcidDOI.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/OrcidDOI.java @@ -1,24 +1,25 @@ + package eu.dnetlib.dhp.schema.orcid; import java.util.List; public class OrcidDOI { - private String doi; - private List authors; + private String doi; + private List authors; - public String getDoi() { - return doi; - } + public String getDoi() { + return doi; + } - public void setDoi(String doi) { - this.doi = doi; - } + public void setDoi(String doi) { + this.doi = doi; + } - public List getAuthors() { - return authors; - } + public List getAuthors() { + return authors; + } - public void setAuthors(List authors) { - this.authors = authors; - } + public void setAuthors(List authors) { + this.authors = authors; + } } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala index a29809fc0..860254527 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala @@ -62,7 +62,7 @@ object SparkGenerateDoiBoost { val orcidPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/orcidPublication").as[Publication].map(p => (p.getId, p)) fj.joinWith(orcidPublication, fj("_1").equalTo(orcidPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/secondJoin") - logger.info("Phase 3) Join Result with MAG") + logger.info("Phase 4) Join Result with MAG") val sj: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/secondJoin").as[Publication].map(p => (p.getId, p)) val magPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/magPublication").as[Publication].map(p => (p.getId, p)) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala index f291a92f9..88fee72b7 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala @@ -21,15 +21,17 @@ object SparkImportMagIntoDataset { val stream = Map( - "Affiliations" -> Tuple2("mag/Affiliations.txt", Seq("AffiliationId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "GridId:string", "OfficialPage:string", "WikiPage:string", "PaperCount:long", "CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")), - "Authors" -> Tuple2("mag/Authors.txt", Seq("AuthorId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "LastKnownAffiliationId:long?", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")), - "ConferenceInstances" -> Tuple2("mag/ConferenceInstances.txt", Seq("ConferenceInstanceId:long", "NormalizedName:string", "DisplayName:string", "ConferenceSeriesId:long", "Location:string", "OfficialUrl:string", "StartDate:DateTime?", "EndDate:DateTime?", "AbstractRegistrationDate:DateTime?", "SubmissionDeadlineDate:DateTime?", "NotificationDueDate:DateTime?", "FinalVersionDueDate:DateTime?", "PaperCount:long", "CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")), + "Affiliations" -> Tuple2("mag/Affiliations.txt", Seq("AffiliationId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "GridId:string", "OfficialPage:string", "WikiPage:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "Iso3166Code:string", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")), + "AuthorExtendedAttributes" -> Tuple2("mag/AuthorExtendedAttributes.txt", Seq("AuthorId:long", "AttributeType:int", "AttributeValue:string")), + "Authors" -> Tuple2("mag/Authors.txt", Seq("AuthorId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "LastKnownAffiliationId:long?", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")), + "ConferenceInstances" -> Tuple2("mag/ConferenceInstances.txt", Seq("ConferenceInstanceId:long", "NormalizedName:string", "DisplayName:string", "ConferenceSeriesId:long", "Location:string", "OfficialUrl:string", "StartDate:DateTime?", "EndDate:DateTime?", "AbstractRegistrationDate:DateTime?", "SubmissionDeadlineDate:DateTime?", "NotificationDueDate:DateTime?", "FinalVersionDueDate:DateTime?", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")), "ConferenceSeries" -> Tuple2("mag/ConferenceSeries.txt", Seq("ConferenceSeriesId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")), "EntityRelatedEntities" -> Tuple2("advanced/EntityRelatedEntities.txt", Seq("EntityId:long", "EntityType:string", "RelatedEntityId:long", "RelatedEntityType:string", "RelatedType:int", "Score:float")), "FieldOfStudyChildren" -> Tuple2("advanced/FieldOfStudyChildren.txt", Seq("FieldOfStudyId:long", "ChildFieldOfStudyId:long")), "FieldOfStudyExtendedAttributes" -> Tuple2("advanced/FieldOfStudyExtendedAttributes.txt", Seq("FieldOfStudyId:long", "AttributeType:int", "AttributeValue:string")), - "FieldsOfStudy" -> Tuple2("advanced/FieldsOfStudy.txt", Seq("FieldOfStudyId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "MainType:string", "Level:int", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")), - "Journals" -> Tuple2("mag/Journals.txt", Seq("JournalId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "Issn:string", "Publisher:string", "Webpage:string", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")), + // ['FieldOfStudyId:long', 'Rank:uint', 'NormalizedName:string', 'DisplayName:string', 'MainType:string', 'Level:int', 'PaperCount:long', 'PaperFamilyCount:long', 'CitationCount:long', 'CreatedDate:DateTime'] + "FieldsOfStudy" -> Tuple2("advanced/FieldsOfStudy.txt", Seq("FieldOfStudyId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "MainType:string", "Level:int", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")), + "Journals" -> Tuple2("mag/Journals.txt", Seq("JournalId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "Issn:string", "Publisher:string", "Webpage:string", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "CreatedDate:DateTime")), "PaperAbstractsInvertedIndex" -> Tuple2("nlp/PaperAbstractsInvertedIndex.txt.*", Seq("PaperId:long", "IndexedAbstract:string")), "PaperAuthorAffiliations" -> Tuple2("mag/PaperAuthorAffiliations.txt", Seq("PaperId:long", "AuthorId:long", "AffiliationId:long?", "AuthorSequenceNumber:uint", "OriginalAuthor:string", "OriginalAffiliation:string")), "PaperCitationContexts" -> Tuple2("nlp/PaperCitationContexts.txt", Seq("PaperId:long", "PaperReferenceId:long", "CitationContext:string")), @@ -39,7 +41,7 @@ object SparkImportMagIntoDataset { "PaperReferences" -> Tuple2("mag/PaperReferences.txt", Seq("PaperId:long", "PaperReferenceId:long")), "PaperResources" -> Tuple2("mag/PaperResources.txt", Seq("PaperId:long", "ResourceType:int", "ResourceUrl:string", "SourceUrl:string", "RelationshipType:int")), "PaperUrls" -> Tuple2("mag/PaperUrls.txt", Seq("PaperId:long", "SourceType:int?", "SourceUrl:string", "LanguageCode:string")), - "Papers" -> Tuple2("mag/Papers.txt", Seq("PaperId:long", "Rank:uint", "Doi:string", "DocType:string", "PaperTitle:string", "OriginalTitle:string", "BookTitle:string", "Year:int?", "Date:DateTime?", "Publisher:string", "JournalId:long?", "ConferenceSeriesId:long?", "ConferenceInstanceId:long?", "Volume:string", "Issue:string", "FirstPage:string", "LastPage:string", "ReferenceCount:long", "CitationCount:long", "EstimatedCitation:long", "OriginalVenue:string", "FamilyId:long?", "CreatedDate:DateTime")), + "Papers" -> Tuple2("mag/Papers.txt", Seq("PaperId:long", "Rank:uint", "Doi:string", "DocType:string", "PaperTitle:string", "OriginalTitle:string", "BookTitle:string", "Year:int?", "Date:DateTime?", "OnlineDate:DateTime?", "Publisher:string", "JournalId:long?", "ConferenceSeriesId:long?", "ConferenceInstanceId:long?", "Volume:string", "Issue:string", "FirstPage:string", "LastPage:string", "ReferenceCount:long", "CitationCount:long", "EstimatedCitation:long", "OriginalVenue:string", "FamilyId:long?", "FamilyRank:uint?", "CreatedDate:DateTime")), "RelatedFieldOfStudy" -> Tuple2("advanced/RelatedFieldOfStudy.txt", Seq("FieldOfStudyId1:long", "Type1:string", "FieldOfStudyId2:long", "Type2:string", "Rank:float")) ) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala index a24f0e6bb..02dc4979a 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala @@ -26,12 +26,15 @@ object SparkPreProcessMAG { .master(parser.get("master")).getOrCreate() val sourcePath = parser.get("sourcePath") + val workingPath = parser.get("workingPath") + val targetPath = parser.get("targetPath") + import spark.implicits._ implicit val mapEncoderPubs: Encoder[Publication] = org.apache.spark.sql.Encoders.kryo[Publication] implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs) logger.info("Phase 1) make uninque DOI in Papers:") - val d: Dataset[MagPapers] = spark.read.load(s"${parser.get("sourcePath")}/Papers").as[MagPapers] + val d: Dataset[MagPapers] = spark.read.load(s"$sourcePath/Papers").as[MagPapers] // Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one val result: RDD[MagPapers] = d.where(col("Doi").isNotNull) @@ -41,11 +44,12 @@ object SparkPreProcessMAG { .map(_._2) val distinctPaper: Dataset[MagPapers] = spark.createDataset(result) - distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct") + + distinctPaper.write.mode(SaveMode.Overwrite).save(s"$workingPath/Papers_distinct") logger.info("Phase 0) Enrich Publication with description") - val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract] - pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract") + val pa = spark.read.load(s"$sourcePath/PaperAbstractsInvertedIndex").as[MagPaperAbstract] + pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"$workingPath/PaperAbstract") logger.info("Phase 3) Group Author by PaperId") val authors = spark.read.load(s"$sourcePath/Authors").as[MagAuthor] @@ -64,24 +68,24 @@ object SparkPreProcessMAG { } else mpa }).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation")).as("authors")) - .write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_1_paper_authors") + .write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_1_paper_authors") logger.info("Phase 4) create First Version of publication Entity with Paper Journal and Authors") val journals = spark.read.load(s"$sourcePath/Journals").as[MagJournal] - val papers = spark.read.load((s"${parser.get("targetPath")}/Papers_distinct")).as[MagPapers] + val papers = spark.read.load((s"$workingPath/Papers_distinct")).as[MagPapers] - val paperWithAuthors = spark.read.load(s"${parser.get("targetPath")}/merge_step_1_paper_authors").as[MagPaperWithAuthorList] + val paperWithAuthors = spark.read.load(s"$workingPath/merge_step_1_paper_authors").as[MagPaperWithAuthorList] val firstJoin = papers.joinWith(journals, papers("JournalId").equalTo(journals("JournalId")), "left") firstJoin.joinWith(paperWithAuthors, firstJoin("_1.PaperId").equalTo(paperWithAuthors("PaperId")), "left") .map { a => ConversionUtil.createOAFFromJournalAuthorPaper(a) } - .write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_2") + .write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_2") var magPubs: Dataset[(String, Publication)] = - spark.read.load(s"${parser.get("targetPath")}/merge_step_2").as[Publication] + spark.read.load(s"$workingPath/merge_step_2").as[Publication] .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] @@ -95,10 +99,10 @@ object SparkPreProcessMAG { .map(item => ConversionUtil.updatePubsWithConferenceInfo(item)) .write .mode(SaveMode.Overwrite) - .save(s"${parser.get("targetPath")}/merge_step_2_conference") + .save(s"$workingPath/merge_step_2_conference") - magPubs= spark.read.load(s"${parser.get("targetPath")}/merge_step_2_conference").as[Publication] + magPubs= spark.read.load(s"$workingPath/merge_step_2_conference").as[Publication] .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] val paperUrlDataset = spark.read.load(s"$sourcePath/PaperUrls").as[MagPaperUrl].groupBy("PaperId").agg(collect_list(struct("sourceUrl")).as("instances")).as[MagUrl] @@ -108,27 +112,27 @@ object SparkPreProcessMAG { magPubs.joinWith(paperUrlDataset, col("_1").equalTo(paperUrlDataset("PaperId")), "left") .map { a: ((String, Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2)) } .write.mode(SaveMode.Overwrite) - .save(s"${parser.get("targetPath")}/merge_step_3") + .save(s"$workingPath/merge_step_3") // logger.info("Phase 6) Enrich Publication with description") // val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract] // pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract") - val paperAbstract = spark.read.load((s"${parser.get("targetPath")}/PaperAbstract")).as[MagPaperAbstract] + val paperAbstract = spark.read.load((s"$workingPath/PaperAbstract")).as[MagPaperAbstract] - magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_3").as[Publication] + magPubs = spark.read.load(s"$workingPath/merge_step_3").as[Publication] .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] magPubs.joinWith(paperAbstract, col("_1").equalTo(paperAbstract("PaperId")), "left") .map(item => ConversionUtil.updatePubsWithDescription(item) - ).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_4") + ).write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_4") logger.info("Phase 7) Enrich Publication with FieldOfStudy") - magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_4").as[Publication] + magPubs = spark.read.load(s"$workingPath/merge_step_4").as[Publication] .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] val fos = spark.read.load(s"$sourcePath/FieldsOfStudy").select($"FieldOfStudyId".alias("fos"), $"DisplayName", $"MainType") @@ -144,14 +148,14 @@ object SparkPreProcessMAG { .equalTo(paperField("PaperId")), "left") .map(item => ConversionUtil.updatePubsWithSubject(item)) .write.mode(SaveMode.Overwrite) - .save(s"${parser.get("targetPath")}/mag_publication") + .save(s"$workingPath/mag_publication") - val s:RDD[Publication] = spark.read.load(s"${parser.get("targetPath")}/mag_publication").as[Publication] + val s:RDD[Publication] = spark.read.load(s"$workingPath/mag_publication").as[Publication] .map(p=>Tuple2(p.getId, p)).rdd.reduceByKey((a:Publication, b:Publication) => ConversionUtil.mergePublication(a,b)) .map(_._2) - spark.createDataset(s).as[Publication].write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/mag_publication_u") + spark.createDataset(s).as[Publication].write.mode(SaveMode.Overwrite).save(s"$targetPath/magPublication") } } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala index f230c604f..f7255e559 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala @@ -1,6 +1,7 @@ package eu.dnetlib.doiboost.orcid -import eu.dnetlib.dhp.schema.oaf.{Author, Publication} +import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Publication} +import eu.dnetlib.dhp.schema.orcid.OrcidDOI import eu.dnetlib.doiboost.DoiBoostMappingUtil import eu.dnetlib.doiboost.DoiBoostMappingUtil.{ORCID, PID_TYPES, createSP, generateDataInfo, generateIdentifier} import org.apache.commons.lang.StringUtils @@ -43,16 +44,19 @@ object ORCIDToOAF { } - def convertTOOAF(input:ORCIDElement) :Publication = { - val doi = input.doi + def convertTOOAF(input:OrcidDOI) :Publication = { + val doi = input.getDoi val pub:Publication = new Publication - pub.setPid(List(createSP(doi, "doi", PID_TYPES)).asJava) + pub.setPid(List(createSP(doi.toLowerCase, "doi", PID_TYPES)).asJava) pub.setDataInfo(generateDataInfo()) pub.setId(generateIdentifier(pub, doi.toLowerCase)) try{ - pub.setAuthor(input.authors.map(a=> { - generateAuthor(a.name, a.surname, a.creditName, a.oid) - }).asJava) + + val l:List[Author]= input.getAuthors.asScala.map(a=> { + generateAuthor(a.getName, a.getSurname, a.getCreditName, a.getOid) + })(collection.breakOut) + + pub.setAuthor(l.asJava) pub.setCollectedfrom(List(DoiBoostMappingUtil.createORIDCollectedFrom()).asJava) pub.setDataInfo(DoiBoostMappingUtil.generateDataInfo()) pub @@ -63,6 +67,13 @@ object ORCIDToOAF { } } + def generateOricPIDDatainfo():DataInfo = { + val di =DoiBoostMappingUtil.generateDataInfo("0.91") + di.getProvenanceaction.setClassid("sysimport:crosswalk:entityregistry") + di.getProvenanceaction.setClassname("Harvested") + di + } + def generateAuthor(given: String, family: String, fullName:String, orcid: String): Author = { val a = new Author a.setName(given) @@ -72,7 +83,7 @@ object ORCIDToOAF { else a.setFullname(s"$given $family") if (StringUtils.isNotBlank(orcid)) - a.setPid(List(createSP(orcid, ORCID, PID_TYPES)).asJava) + a.setPid(List(createSP(orcid, ORCID, PID_TYPES, generateOricPIDDatainfo())).asJava) a } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala index 1cd9ba4d4..f1c7c58b4 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala @@ -1,21 +1,72 @@ package eu.dnetlib.doiboost.orcid +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.oa.merge.AuthorMerger import eu.dnetlib.dhp.schema.oaf.Publication +import eu.dnetlib.dhp.schema.orcid.OrcidDOI import eu.dnetlib.doiboost.mag.ConversionUtil import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD +import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} object SparkConvertORCIDToOAF { + val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass) + + def getPublicationAggregator(): Aggregator[(String, Publication), Publication, Publication] = new Aggregator[(String, Publication), Publication, Publication]{ + + override def zero: Publication = new Publication() + + override def reduce(b: Publication, a: (String, Publication)): Publication = { + b.mergeFrom(a._2) + b.setAuthor(AuthorMerger.mergeAuthor(a._2.getAuthor, b.getAuthor)) + if (b.getId == null) + b.setId(a._2.getId) + b + } + override def merge(wx: Publication, wy: Publication): Publication = { + wx.mergeFrom(wy) + wx.setAuthor(AuthorMerger.mergeAuthor(wy.getAuthor, wx.getAuthor)) + if(wx.getId == null && wy.getId.nonEmpty) + wx.setId(wy.getId) + wx + } + override def finish(reduction: Publication): Publication = reduction + + override def bufferEncoder: Encoder[Publication] = + Encoders.kryo(classOf[Publication]) + + override def outputEncoder: Encoder[Publication] = + Encoders.kryo(classOf[Publication]) + } + +def run(spark:SparkSession,sourcePath:String, targetPath:String):Unit = { + implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] + implicit val mapOrcid: Encoder[OrcidDOI] = Encoders.kryo[OrcidDOI] + implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs) + + val mapper = new ObjectMapper() + mapper.getDeserializationConfig.withFeatures(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) + + val dataset:Dataset[OrcidDOI] = spark.createDataset(spark.sparkContext.textFile(sourcePath).map(s => mapper.readValue(s,classOf[OrcidDOI]))) + + logger.info("Converting ORCID to OAF") + dataset.map(o => ORCIDToOAF.convertTOOAF(o)).filter(p=>p!=null) + .map(d => (d.getId, d)) + .groupByKey(_._1)(Encoders.STRING) + .agg(getPublicationAggregator().toColumn) + .map(p => p._2) + .write.mode(SaveMode.Overwrite).save(targetPath) +} def main(args: Array[String]): Unit = { - val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass) + val conf: SparkConf = new SparkConf() val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json"))) parser.parseArgument(args) @@ -26,19 +77,12 @@ object SparkConvertORCIDToOAF { .appName(getClass.getSimpleName) .master(parser.get("master")).getOrCreate() - implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] - implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs) - import spark.implicits._ + + val sourcePath = parser.get("sourcePath") val targetPath = parser.get("targetPath") - val dataset:Dataset[ORCIDElement] = spark.read.json(sourcePath).as[ORCIDElement] + run(spark, sourcePath, targetPath) - - logger.info("Converting ORCID to OAF") - val d:RDD[Publication] = dataset.map(o => ORCIDToOAF.convertTOOAF(o)).filter(p=>p!=null).map(p=>(p.getId,p)).rdd.reduceByKey(ConversionUtil.mergePublication) - .map(_._2) - - spark.createDataset(d).as[Publication].write.mode(SaveMode.Overwrite).save(targetPath) } } diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml index e35f88abd..dcde62c9d 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml @@ -39,14 +39,7 @@ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - + diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml index 2277b79b0..9d19dddc7 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml @@ -8,6 +8,10 @@ targetPath the working dir base path + + workingPath + the working dir base path + sparkDriverMemory memory for driver process @@ -31,10 +35,10 @@ - - + + - + @@ -52,10 +56,10 @@ ${sparkExtraOPT} --sourcePath${sourcePath} - --targetPath${targetPath} + --targetPath${workingPath} --masteryarn-cluster - + @@ -65,7 +69,7 @@ yarn-cluster cluster - Convert Mag to Dataset + Convert Mag to OAF Dataset eu.dnetlib.doiboost.mag.SparkPreProcessMAG dhp-doiboost-${projectVersion}.jar @@ -75,7 +79,8 @@ --conf spark.sql.shuffle.partitions=3840 ${sparkExtraOPT} - --sourcePath${sourcePath} + --sourcePath${workingPath} + --workingPath${workingPath}/process --targetPath${targetPath} --masteryarn-cluster diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/preprocess_mag_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/preprocess_mag_params.json index bf0b80f69..d45f7269f 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/preprocess_mag_params.json +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/preprocess_mag_params.json @@ -1,6 +1,7 @@ [ {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the base path of MAG input", "paramRequired": true}, - {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true}, + {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the target dir path", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working dir path", "paramRequired": true}, {"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala index 5b8240942..0222b393d 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala @@ -1,5 +1,8 @@ package eu.dnetlib.doiboost.orcid +import eu.dnetlib.dhp.schema.oaf.Publication +import eu.dnetlib.doiboost.orcid.SparkConvertORCIDToOAF.getClass +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.codehaus.jackson.map.ObjectMapper import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -21,6 +24,30 @@ class MappingORCIDToOAFTest { }) } +// @Test +// def testOAFConvert():Unit ={ +// +// val spark: SparkSession = +// SparkSession +// .builder() +// .appName(getClass.getSimpleName) +// .master("local[*]").getOrCreate() +// +// +// SparkConvertORCIDToOAF.run( spark,"/Users/sandro/Downloads/orcid", "/Users/sandro/Downloads/orcid_oaf") +// implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] +// +// val df = spark.read.load("/Users/sandro/Downloads/orcid_oaf").as[Publication] +// println(df.first.getId) +// println(mapper.writeValueAsString(df.first())) +// +// +// +// +// } + + + From 197f286fa476f2670a268ca08b7db7ed4154734c Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 7 Dec 2020 21:52:17 +0100 Subject: [PATCH 2/4] removed duplicated dependency (org.apache.httpcomponents:httpclent --- pom.xml | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index d06bdbe20..a2e2587b3 100644 --- a/pom.xml +++ b/pom.xml @@ -278,12 +278,12 @@ org.apache.httpcomponents httpclient - 4.5.3 + ${org.apache.httpcomponents.version} org.apache.httpcomponents httpmime - 4.5.3 + ${org.apache.httpcomponents.version} org.noggit @@ -484,12 +484,6 @@ ${common.text.version} - - org.apache.httpcomponents - httpclient - ${org.apache.httpcomponents.version} - - @@ -719,7 +713,7 @@ 1.8 4.1.2 1.8 - 4.3.4 + 4.5.3 4.0.1 From 2fcc24b36e04b6ee52ab148d7ecaaa885ea25576 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 7 Dec 2020 21:52:32 +0100 Subject: [PATCH 3/4] code formatting --- .../eu/dnetlib/dhp/schema/orcid/OrcidDOI.java | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/OrcidDOI.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/OrcidDOI.java index 11bce26c8..cf372c12a 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/OrcidDOI.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/OrcidDOI.java @@ -1,24 +1,25 @@ + package eu.dnetlib.dhp.schema.orcid; import java.util.List; public class OrcidDOI { - private String doi; - private List authors; + private String doi; + private List authors; - public String getDoi() { - return doi; - } + public String getDoi() { + return doi; + } - public void setDoi(String doi) { - this.doi = doi; - } + public void setDoi(String doi) { + this.doi = doi; + } - public List getAuthors() { - return authors; - } + public List getAuthors() { + return authors; + } - public void setAuthors(List authors) { - this.authors = authors; - } + public void setAuthors(List authors) { + this.authors = authors; + } } From fba11eef2ab857ab16810c4f522be2d7e9b06127 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 7 Dec 2020 21:53:13 +0100 Subject: [PATCH 4/4] cleanup --- .../broker/oa/PartitionEventsByDsIdJob.java | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java index 65d5e6f94..e9644122f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java @@ -18,6 +18,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.slf4j.Logger; @@ -40,8 +42,9 @@ public class PartitionEventsByDsIdJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(PartitionEventsByDsIdJob.class - .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/od_partitions_params.json"))); + .toString( + PartitionEventsByDsIdJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/od_partitions_params.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -63,21 +66,27 @@ public class PartitionEventsByDsIdJob { final Set validOpendoarIds = new HashSet<>(); if (!opendoarIds.trim().equals("-")) { - validOpendoarIds.addAll(Arrays.stream(opendoarIds.split(",")) - .map(String::trim) - .filter(StringUtils::isNotBlank) - .map(s -> OPENDOAR_NSPREFIX + DigestUtils.md5Hex(s)) - .collect(Collectors.toSet())); + validOpendoarIds + .addAll( + Arrays + .stream(opendoarIds.split(",")) + .map(String::trim) + .filter(StringUtils::isNotBlank) + .map(s -> OPENDOAR_NSPREFIX + DigestUtils.md5Hex(s)) + .collect(Collectors.toSet())); } + log.info("validOpendoarIds: {}", validOpendoarIds); runWithSparkSession(conf, isSparkSessionManaged, spark -> { ClusterUtils .readPath(spark, eventsPath, Event.class) - .filter(e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId())) - .filter(e -> e.getMap().getTargetDatasourceId().startsWith(OPENDOAR_NSPREFIX)) - .filter(e -> validOpendoarIds.contains(e.getMap().getTargetDatasourceId())) - .map(e -> messageFromNotification(e), Encoders.bean(ShortEventMessageWithGroupId.class)) + .filter((FilterFunction) e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId())) + .filter((FilterFunction) e -> e.getMap().getTargetDatasourceId().startsWith(OPENDOAR_NSPREFIX)) + .filter((FilterFunction) e -> validOpendoarIds.contains(e.getMap().getTargetDatasourceId())) + .map( + (MapFunction) e -> messageFromNotification(e), + Encoders.bean(ShortEventMessageWithGroupId.class)) .coalesce(1) .write() .partitionBy("group")