diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala
index a29809fc0..860254527 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala
@@ -62,7 +62,7 @@ object SparkGenerateDoiBoost {
val orcidPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/orcidPublication").as[Publication].map(p => (p.getId, p))
fj.joinWith(orcidPublication, fj("_1").equalTo(orcidPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/secondJoin")
- logger.info("Phase 3) Join Result with MAG")
+ logger.info("Phase 4) Join Result with MAG")
val sj: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/secondJoin").as[Publication].map(p => (p.getId, p))
val magPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/magPublication").as[Publication].map(p => (p.getId, p))
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala
index f291a92f9..88fee72b7 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala
@@ -21,15 +21,17 @@ object SparkImportMagIntoDataset {
val stream = Map(
- "Affiliations" -> Tuple2("mag/Affiliations.txt", Seq("AffiliationId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "GridId:string", "OfficialPage:string", "WikiPage:string", "PaperCount:long", "CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")),
- "Authors" -> Tuple2("mag/Authors.txt", Seq("AuthorId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "LastKnownAffiliationId:long?", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")),
- "ConferenceInstances" -> Tuple2("mag/ConferenceInstances.txt", Seq("ConferenceInstanceId:long", "NormalizedName:string", "DisplayName:string", "ConferenceSeriesId:long", "Location:string", "OfficialUrl:string", "StartDate:DateTime?", "EndDate:DateTime?", "AbstractRegistrationDate:DateTime?", "SubmissionDeadlineDate:DateTime?", "NotificationDueDate:DateTime?", "FinalVersionDueDate:DateTime?", "PaperCount:long", "CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")),
+ "Affiliations" -> Tuple2("mag/Affiliations.txt", Seq("AffiliationId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "GridId:string", "OfficialPage:string", "WikiPage:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "Iso3166Code:string", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")),
+ "AuthorExtendedAttributes" -> Tuple2("mag/AuthorExtendedAttributes.txt", Seq("AuthorId:long", "AttributeType:int", "AttributeValue:string")),
+ "Authors" -> Tuple2("mag/Authors.txt", Seq("AuthorId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "LastKnownAffiliationId:long?", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
+ "ConferenceInstances" -> Tuple2("mag/ConferenceInstances.txt", Seq("ConferenceInstanceId:long", "NormalizedName:string", "DisplayName:string", "ConferenceSeriesId:long", "Location:string", "OfficialUrl:string", "StartDate:DateTime?", "EndDate:DateTime?", "AbstractRegistrationDate:DateTime?", "SubmissionDeadlineDate:DateTime?", "NotificationDueDate:DateTime?", "FinalVersionDueDate:DateTime?", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")),
"ConferenceSeries" -> Tuple2("mag/ConferenceSeries.txt", Seq("ConferenceSeriesId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"EntityRelatedEntities" -> Tuple2("advanced/EntityRelatedEntities.txt", Seq("EntityId:long", "EntityType:string", "RelatedEntityId:long", "RelatedEntityType:string", "RelatedType:int", "Score:float")),
"FieldOfStudyChildren" -> Tuple2("advanced/FieldOfStudyChildren.txt", Seq("FieldOfStudyId:long", "ChildFieldOfStudyId:long")),
"FieldOfStudyExtendedAttributes" -> Tuple2("advanced/FieldOfStudyExtendedAttributes.txt", Seq("FieldOfStudyId:long", "AttributeType:int", "AttributeValue:string")),
- "FieldsOfStudy" -> Tuple2("advanced/FieldsOfStudy.txt", Seq("FieldOfStudyId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "MainType:string", "Level:int", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")),
- "Journals" -> Tuple2("mag/Journals.txt", Seq("JournalId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "Issn:string", "Publisher:string", "Webpage:string", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")),
+ // ['FieldOfStudyId:long', 'Rank:uint', 'NormalizedName:string', 'DisplayName:string', 'MainType:string', 'Level:int', 'PaperCount:long', 'PaperFamilyCount:long', 'CitationCount:long', 'CreatedDate:DateTime']
+ "FieldsOfStudy" -> Tuple2("advanced/FieldsOfStudy.txt", Seq("FieldOfStudyId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "MainType:string", "Level:int", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
+ "Journals" -> Tuple2("mag/Journals.txt", Seq("JournalId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "Issn:string", "Publisher:string", "Webpage:string", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "CreatedDate:DateTime")),
"PaperAbstractsInvertedIndex" -> Tuple2("nlp/PaperAbstractsInvertedIndex.txt.*", Seq("PaperId:long", "IndexedAbstract:string")),
"PaperAuthorAffiliations" -> Tuple2("mag/PaperAuthorAffiliations.txt", Seq("PaperId:long", "AuthorId:long", "AffiliationId:long?", "AuthorSequenceNumber:uint", "OriginalAuthor:string", "OriginalAffiliation:string")),
"PaperCitationContexts" -> Tuple2("nlp/PaperCitationContexts.txt", Seq("PaperId:long", "PaperReferenceId:long", "CitationContext:string")),
@@ -39,7 +41,7 @@ object SparkImportMagIntoDataset {
"PaperReferences" -> Tuple2("mag/PaperReferences.txt", Seq("PaperId:long", "PaperReferenceId:long")),
"PaperResources" -> Tuple2("mag/PaperResources.txt", Seq("PaperId:long", "ResourceType:int", "ResourceUrl:string", "SourceUrl:string", "RelationshipType:int")),
"PaperUrls" -> Tuple2("mag/PaperUrls.txt", Seq("PaperId:long", "SourceType:int?", "SourceUrl:string", "LanguageCode:string")),
- "Papers" -> Tuple2("mag/Papers.txt", Seq("PaperId:long", "Rank:uint", "Doi:string", "DocType:string", "PaperTitle:string", "OriginalTitle:string", "BookTitle:string", "Year:int?", "Date:DateTime?", "Publisher:string", "JournalId:long?", "ConferenceSeriesId:long?", "ConferenceInstanceId:long?", "Volume:string", "Issue:string", "FirstPage:string", "LastPage:string", "ReferenceCount:long", "CitationCount:long", "EstimatedCitation:long", "OriginalVenue:string", "FamilyId:long?", "CreatedDate:DateTime")),
+ "Papers" -> Tuple2("mag/Papers.txt", Seq("PaperId:long", "Rank:uint", "Doi:string", "DocType:string", "PaperTitle:string", "OriginalTitle:string", "BookTitle:string", "Year:int?", "Date:DateTime?", "OnlineDate:DateTime?", "Publisher:string", "JournalId:long?", "ConferenceSeriesId:long?", "ConferenceInstanceId:long?", "Volume:string", "Issue:string", "FirstPage:string", "LastPage:string", "ReferenceCount:long", "CitationCount:long", "EstimatedCitation:long", "OriginalVenue:string", "FamilyId:long?", "FamilyRank:uint?", "CreatedDate:DateTime")),
"RelatedFieldOfStudy" -> Tuple2("advanced/RelatedFieldOfStudy.txt", Seq("FieldOfStudyId1:long", "Type1:string", "FieldOfStudyId2:long", "Type2:string", "Rank:float"))
)
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala
index a24f0e6bb..02dc4979a 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala
@@ -26,12 +26,15 @@ object SparkPreProcessMAG {
.master(parser.get("master")).getOrCreate()
val sourcePath = parser.get("sourcePath")
+ val workingPath = parser.get("workingPath")
+ val targetPath = parser.get("targetPath")
+
import spark.implicits._
implicit val mapEncoderPubs: Encoder[Publication] = org.apache.spark.sql.Encoders.kryo[Publication]
implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs)
logger.info("Phase 1) make uninque DOI in Papers:")
- val d: Dataset[MagPapers] = spark.read.load(s"${parser.get("sourcePath")}/Papers").as[MagPapers]
+ val d: Dataset[MagPapers] = spark.read.load(s"$sourcePath/Papers").as[MagPapers]
// Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one
val result: RDD[MagPapers] = d.where(col("Doi").isNotNull)
@@ -41,11 +44,12 @@ object SparkPreProcessMAG {
.map(_._2)
val distinctPaper: Dataset[MagPapers] = spark.createDataset(result)
- distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct")
+
+ distinctPaper.write.mode(SaveMode.Overwrite).save(s"$workingPath/Papers_distinct")
logger.info("Phase 0) Enrich Publication with description")
- val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract]
- pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract")
+ val pa = spark.read.load(s"$sourcePath/PaperAbstractsInvertedIndex").as[MagPaperAbstract]
+ pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"$workingPath/PaperAbstract")
logger.info("Phase 3) Group Author by PaperId")
val authors = spark.read.load(s"$sourcePath/Authors").as[MagAuthor]
@@ -64,24 +68,24 @@ object SparkPreProcessMAG {
} else
mpa
}).groupBy("PaperId").agg(collect_list(struct($"author", $"affiliation")).as("authors"))
- .write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_1_paper_authors")
+ .write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_1_paper_authors")
logger.info("Phase 4) create First Version of publication Entity with Paper Journal and Authors")
val journals = spark.read.load(s"$sourcePath/Journals").as[MagJournal]
- val papers = spark.read.load((s"${parser.get("targetPath")}/Papers_distinct")).as[MagPapers]
+ val papers = spark.read.load((s"$workingPath/Papers_distinct")).as[MagPapers]
- val paperWithAuthors = spark.read.load(s"${parser.get("targetPath")}/merge_step_1_paper_authors").as[MagPaperWithAuthorList]
+ val paperWithAuthors = spark.read.load(s"$workingPath/merge_step_1_paper_authors").as[MagPaperWithAuthorList]
val firstJoin = papers.joinWith(journals, papers("JournalId").equalTo(journals("JournalId")), "left")
firstJoin.joinWith(paperWithAuthors, firstJoin("_1.PaperId").equalTo(paperWithAuthors("PaperId")), "left")
.map { a => ConversionUtil.createOAFFromJournalAuthorPaper(a) }
- .write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_2")
+ .write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_2")
var magPubs: Dataset[(String, Publication)] =
- spark.read.load(s"${parser.get("targetPath")}/merge_step_2").as[Publication]
+ spark.read.load(s"$workingPath/merge_step_2").as[Publication]
.map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)]
@@ -95,10 +99,10 @@ object SparkPreProcessMAG {
.map(item => ConversionUtil.updatePubsWithConferenceInfo(item))
.write
.mode(SaveMode.Overwrite)
- .save(s"${parser.get("targetPath")}/merge_step_2_conference")
+ .save(s"$workingPath/merge_step_2_conference")
- magPubs= spark.read.load(s"${parser.get("targetPath")}/merge_step_2_conference").as[Publication]
+ magPubs= spark.read.load(s"$workingPath/merge_step_2_conference").as[Publication]
.map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)]
val paperUrlDataset = spark.read.load(s"$sourcePath/PaperUrls").as[MagPaperUrl].groupBy("PaperId").agg(collect_list(struct("sourceUrl")).as("instances")).as[MagUrl]
@@ -108,27 +112,27 @@ object SparkPreProcessMAG {
magPubs.joinWith(paperUrlDataset, col("_1").equalTo(paperUrlDataset("PaperId")), "left")
.map { a: ((String, Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2)) }
.write.mode(SaveMode.Overwrite)
- .save(s"${parser.get("targetPath")}/merge_step_3")
+ .save(s"$workingPath/merge_step_3")
// logger.info("Phase 6) Enrich Publication with description")
// val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract]
// pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract")
- val paperAbstract = spark.read.load((s"${parser.get("targetPath")}/PaperAbstract")).as[MagPaperAbstract]
+ val paperAbstract = spark.read.load((s"$workingPath/PaperAbstract")).as[MagPaperAbstract]
- magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_3").as[Publication]
+ magPubs = spark.read.load(s"$workingPath/merge_step_3").as[Publication]
.map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)]
magPubs.joinWith(paperAbstract, col("_1").equalTo(paperAbstract("PaperId")), "left")
.map(item => ConversionUtil.updatePubsWithDescription(item)
- ).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/merge_step_4")
+ ).write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_4")
logger.info("Phase 7) Enrich Publication with FieldOfStudy")
- magPubs = spark.read.load(s"${parser.get("targetPath")}/merge_step_4").as[Publication]
+ magPubs = spark.read.load(s"$workingPath/merge_step_4").as[Publication]
.map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)]
val fos = spark.read.load(s"$sourcePath/FieldsOfStudy").select($"FieldOfStudyId".alias("fos"), $"DisplayName", $"MainType")
@@ -144,14 +148,14 @@ object SparkPreProcessMAG {
.equalTo(paperField("PaperId")), "left")
.map(item => ConversionUtil.updatePubsWithSubject(item))
.write.mode(SaveMode.Overwrite)
- .save(s"${parser.get("targetPath")}/mag_publication")
+ .save(s"$workingPath/mag_publication")
- val s:RDD[Publication] = spark.read.load(s"${parser.get("targetPath")}/mag_publication").as[Publication]
+ val s:RDD[Publication] = spark.read.load(s"$workingPath/mag_publication").as[Publication]
.map(p=>Tuple2(p.getId, p)).rdd.reduceByKey((a:Publication, b:Publication) => ConversionUtil.mergePublication(a,b))
.map(_._2)
- spark.createDataset(s).as[Publication].write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/mag_publication_u")
+ spark.createDataset(s).as[Publication].write.mode(SaveMode.Overwrite).save(s"$targetPath/magPublication")
}
}
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala
index f230c604f..f7255e559 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala
@@ -1,6 +1,7 @@
package eu.dnetlib.doiboost.orcid
-import eu.dnetlib.dhp.schema.oaf.{Author, Publication}
+import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Publication}
+import eu.dnetlib.dhp.schema.orcid.OrcidDOI
import eu.dnetlib.doiboost.DoiBoostMappingUtil
import eu.dnetlib.doiboost.DoiBoostMappingUtil.{ORCID, PID_TYPES, createSP, generateDataInfo, generateIdentifier}
import org.apache.commons.lang.StringUtils
@@ -43,16 +44,19 @@ object ORCIDToOAF {
}
- def convertTOOAF(input:ORCIDElement) :Publication = {
- val doi = input.doi
+ def convertTOOAF(input:OrcidDOI) :Publication = {
+ val doi = input.getDoi
val pub:Publication = new Publication
- pub.setPid(List(createSP(doi, "doi", PID_TYPES)).asJava)
+ pub.setPid(List(createSP(doi.toLowerCase, "doi", PID_TYPES)).asJava)
pub.setDataInfo(generateDataInfo())
pub.setId(generateIdentifier(pub, doi.toLowerCase))
try{
- pub.setAuthor(input.authors.map(a=> {
- generateAuthor(a.name, a.surname, a.creditName, a.oid)
- }).asJava)
+
+ val l:List[Author]= input.getAuthors.asScala.map(a=> {
+ generateAuthor(a.getName, a.getSurname, a.getCreditName, a.getOid)
+ })(collection.breakOut)
+
+ pub.setAuthor(l.asJava)
pub.setCollectedfrom(List(DoiBoostMappingUtil.createORIDCollectedFrom()).asJava)
pub.setDataInfo(DoiBoostMappingUtil.generateDataInfo())
pub
@@ -63,6 +67,13 @@ object ORCIDToOAF {
}
}
+ def generateOricPIDDatainfo():DataInfo = {
+ val di =DoiBoostMappingUtil.generateDataInfo("0.91")
+ di.getProvenanceaction.setClassid("sysimport:crosswalk:entityregistry")
+ di.getProvenanceaction.setClassname("Harvested")
+ di
+ }
+
def generateAuthor(given: String, family: String, fullName:String, orcid: String): Author = {
val a = new Author
a.setName(given)
@@ -72,7 +83,7 @@ object ORCIDToOAF {
else
a.setFullname(s"$given $family")
if (StringUtils.isNotBlank(orcid))
- a.setPid(List(createSP(orcid, ORCID, PID_TYPES)).asJava)
+ a.setPid(List(createSP(orcid, ORCID, PID_TYPES, generateOricPIDDatainfo())).asJava)
a
}
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala
index 1cd9ba4d4..f1c7c58b4 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala
@@ -1,21 +1,72 @@
package eu.dnetlib.doiboost.orcid
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import eu.dnetlib.dhp.oa.merge.AuthorMerger
import eu.dnetlib.dhp.schema.oaf.Publication
+import eu.dnetlib.dhp.schema.orcid.OrcidDOI
import eu.dnetlib.doiboost.mag.ConversionUtil
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
object SparkConvertORCIDToOAF {
+ val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass)
+
+ def getPublicationAggregator(): Aggregator[(String, Publication), Publication, Publication] = new Aggregator[(String, Publication), Publication, Publication]{
+
+ override def zero: Publication = new Publication()
+
+ override def reduce(b: Publication, a: (String, Publication)): Publication = {
+ b.mergeFrom(a._2)
+ b.setAuthor(AuthorMerger.mergeAuthor(a._2.getAuthor, b.getAuthor))
+ if (b.getId == null)
+ b.setId(a._2.getId)
+ b
+ }
+ override def merge(wx: Publication, wy: Publication): Publication = {
+ wx.mergeFrom(wy)
+ wx.setAuthor(AuthorMerger.mergeAuthor(wy.getAuthor, wx.getAuthor))
+ if(wx.getId == null && wy.getId.nonEmpty)
+ wx.setId(wy.getId)
+ wx
+ }
+ override def finish(reduction: Publication): Publication = reduction
+
+ override def bufferEncoder: Encoder[Publication] =
+ Encoders.kryo(classOf[Publication])
+
+ override def outputEncoder: Encoder[Publication] =
+ Encoders.kryo(classOf[Publication])
+ }
+
+def run(spark:SparkSession,sourcePath:String, targetPath:String):Unit = {
+ implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
+ implicit val mapOrcid: Encoder[OrcidDOI] = Encoders.kryo[OrcidDOI]
+ implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs)
+
+ val mapper = new ObjectMapper()
+ mapper.getDeserializationConfig.withFeatures(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
+
+ val dataset:Dataset[OrcidDOI] = spark.createDataset(spark.sparkContext.textFile(sourcePath).map(s => mapper.readValue(s,classOf[OrcidDOI])))
+
+ logger.info("Converting ORCID to OAF")
+ dataset.map(o => ORCIDToOAF.convertTOOAF(o)).filter(p=>p!=null)
+ .map(d => (d.getId, d))
+ .groupByKey(_._1)(Encoders.STRING)
+ .agg(getPublicationAggregator().toColumn)
+ .map(p => p._2)
+ .write.mode(SaveMode.Overwrite).save(targetPath)
+}
def main(args: Array[String]): Unit = {
- val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass)
+
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_map_to_oaf_params.json")))
parser.parseArgument(args)
@@ -26,19 +77,12 @@ object SparkConvertORCIDToOAF {
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
- implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
- implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs)
- import spark.implicits._
+
+
val sourcePath = parser.get("sourcePath")
val targetPath = parser.get("targetPath")
- val dataset:Dataset[ORCIDElement] = spark.read.json(sourcePath).as[ORCIDElement]
+ run(spark, sourcePath, targetPath)
-
- logger.info("Converting ORCID to OAF")
- val d:RDD[Publication] = dataset.map(o => ORCIDToOAF.convertTOOAF(o)).filter(p=>p!=null).map(p=>(p.getId,p)).rdd.reduceByKey(ConversionUtil.mergePublication)
- .map(_._2)
-
- spark.createDataset(d).as[Publication].write.mode(SaveMode.Overwrite).save(targetPath)
}
}
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml
index e35f88abd..dcde62c9d 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml
@@ -39,14 +39,7 @@
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
-
-
-
-
-
-
-
+
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml
index 2277b79b0..9d19dddc7 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/oozie_app/workflow.xml
@@ -8,6 +8,10 @@
targetPath
the working dir base path
+
+ workingPath
+ the working dir base path
+
sparkDriverMemory
memory for driver process
@@ -31,10 +35,10 @@
-
-
+
+
-
+
@@ -52,10 +56,10 @@
${sparkExtraOPT}
--sourcePath${sourcePath}
- --targetPath${targetPath}
+ --targetPath${workingPath}
--masteryarn-cluster
-
+
@@ -65,7 +69,7 @@
yarn-cluster
cluster
- Convert Mag to Dataset
+ Convert Mag to OAF Dataset
eu.dnetlib.doiboost.mag.SparkPreProcessMAG
dhp-doiboost-${projectVersion}.jar
@@ -75,7 +79,8 @@
--conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT}
- --sourcePath${sourcePath}
+ --sourcePath${workingPath}
+ --workingPath${workingPath}/process
--targetPath${targetPath}
--masteryarn-cluster
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/preprocess_mag_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/preprocess_mag_params.json
index bf0b80f69..d45f7269f 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/preprocess_mag_params.json
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/mag/preprocess_mag_params.json
@@ -1,6 +1,7 @@
[
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the base path of MAG input", "paramRequired": true},
- {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true},
+ {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the target dir path", "paramRequired": true},
+ {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working dir path", "paramRequired": true},
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala
index 5b8240942..0222b393d 100644
--- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala
+++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala
@@ -1,5 +1,8 @@
package eu.dnetlib.doiboost.orcid
+import eu.dnetlib.dhp.schema.oaf.Publication
+import eu.dnetlib.doiboost.orcid.SparkConvertORCIDToOAF.getClass
+import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.codehaus.jackson.map.ObjectMapper
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -21,6 +24,30 @@ class MappingORCIDToOAFTest {
})
}
+// @Test
+// def testOAFConvert():Unit ={
+//
+// val spark: SparkSession =
+// SparkSession
+// .builder()
+// .appName(getClass.getSimpleName)
+// .master("local[*]").getOrCreate()
+//
+//
+// SparkConvertORCIDToOAF.run( spark,"/Users/sandro/Downloads/orcid", "/Users/sandro/Downloads/orcid_oaf")
+// implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
+//
+// val df = spark.read.load("/Users/sandro/Downloads/orcid_oaf").as[Publication]
+// println(df.first.getId)
+// println(mapper.writeValueAsString(df.first()))
+//
+//
+//
+//
+// }
+
+
+