diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala similarity index 100% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala similarity index 100% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala similarity index 100% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala similarity index 99% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala index 1b1c850ba3..edca4a1802 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala @@ -4,20 +4,19 @@ import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf._ import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, OafMapperUtils} import eu.dnetlib.dhp.utils.DHPUtils -import eu.dnetlib.doiboost.DoiBoostMappingUtil.{decideAccessRight, _} +import eu.dnetlib.doiboost.DoiBoostMappingUtil +import eu.dnetlib.doiboost.DoiBoostMappingUtil._ import org.apache.commons.lang.StringUtils import org.json4s import org.json4s.DefaultFormats -import org.json4s.JsonAST.{JValue, _} +import org.json4s.JsonAST._ import org.json4s.jackson.JsonMethods._ import org.slf4j.{Logger, LoggerFactory} +import java.util import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.matching.Regex -import java.util - -import eu.dnetlib.doiboost.DoiBoostMappingUtil case class CrossrefDT(doi: String, json:String, timestamp: Long) {} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala similarity index 77% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala index 159b817c7f..6a1c701aff 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala @@ -6,7 +6,7 @@ import org.apache.commons.io.IOUtils import org.apache.hadoop.io.{IntWritable, Text} import org.apache.spark.SparkConf import org.apache.spark.sql.expressions.Aggregator -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql.{Dataset, Encoder, SaveMode, SparkSession} import org.json4s import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse @@ -17,12 +17,12 @@ object CrossrefDataset { val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass) - def to_item(input:String):CrossrefDT = { + def to_item(input: String): CrossrefDT = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: json4s.JValue = parse(input) - val ts:Long = (json \ "indexed" \ "timestamp").extract[Long] - val doi:String = DoiBoostMappingUtil.normalizeDoi((json \ "DOI").extract[String]) + val ts: Long = (json \ "indexed" \ "timestamp").extract[Long] + val doi: String = DoiBoostMappingUtil.normalizeDoi((json \ "DOI").extract[String]) CrossrefDT(doi, input, ts) } @@ -30,7 +30,6 @@ object CrossrefDataset { def main(args: Array[String]): Unit = { - val conf: SparkConf = new SparkConf() val parser = new ArgumentApplicationParser(IOUtils.toString(CrossrefDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json"))) parser.parseArgument(args) @@ -54,7 +53,7 @@ object CrossrefDataset { return b - if(a.timestamp >b.timestamp) { + if (a.timestamp > b.timestamp) { return a } b @@ -66,7 +65,7 @@ object CrossrefDataset { if (a == null) return b - if(a.timestamp >b.timestamp) { + if (a.timestamp > b.timestamp) { return a } b @@ -79,20 +78,20 @@ object CrossrefDataset { override def finish(reduction: CrossrefDT): CrossrefDT = reduction } - val workingPath:String = parser.get("workingPath") + val workingPath: String = parser.get("workingPath") - val main_ds:Dataset[CrossrefDT] = spark.read.load(s"$workingPath/crossref_ds").as[CrossrefDT] + val main_ds: Dataset[CrossrefDT] = spark.read.load(s"$workingPath/crossref_ds").as[CrossrefDT] val update = - spark.createDataset(spark.sparkContext.sequenceFile(s"$workingPath/index_update", classOf[IntWritable], classOf[Text]) - .map(i =>CrossrefImporter.decompressBlob(i._2.toString)) - .map(i =>to_item(i))) + spark.createDataset(spark.sparkContext.sequenceFile(s"$workingPath/index_update", classOf[IntWritable], classOf[Text]) + .map(i => CrossrefImporter.decompressBlob(i._2.toString)) + .map(i => to_item(i))) main_ds.union(update).groupByKey(_.doi) .agg(crossrefAggregator.toColumn) - .map(s=>s._2) + .map(s => s._2) .write.mode(SaveMode.Overwrite).save(s"$workingPath/crossref_ds_updated") } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefImporter.java b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/CrossrefImporter.java similarity index 100% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefImporter.java rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/CrossrefImporter.java diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ESClient.java b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/ESClient.java similarity index 100% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ESClient.java rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/ESClient.java diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java similarity index 100% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/ExtractCrossrefRecords.java diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala similarity index 73% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala index 526ff7b3ad..6d03abc259 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala @@ -2,17 +2,12 @@ package eu.dnetlib.doiboost.crossref import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.doiboost.DoiBoostMappingUtil -import eu.dnetlib.doiboost.crossref.CrossrefDataset.to_item -import eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries.getClass -import org.apache.hadoop.io.{IntWritable, Text} -import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.rdd.RDD -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.{SparkConf, SparkContext} import org.json4s import org.json4s.DefaultFormats -import org.json4s.JsonAST.JArray -import org.json4s.jackson.JsonMethods.{compact, parse, render} +import org.json4s.jackson.JsonMethods.parse import org.slf4j.{Logger, LoggerFactory} import scala.io.Source @@ -24,11 +19,10 @@ object GenerateCrossrefDataset { implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT] - def crossrefElement(meta: String): CrossrefDT = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: json4s.JValue = parse(meta) - val doi:String = DoiBoostMappingUtil.normalizeDoi((json \ "DOI").extract[String]) + val doi: String = DoiBoostMappingUtil.normalizeDoi((json \ "DOI").extract[String]) val timestamp: Long = (json \ "indexed" \ "timestamp").extract[Long] CrossrefDT(doi, meta, timestamp) @@ -51,14 +45,14 @@ object GenerateCrossrefDataset { import spark.implicits._ - val tmp : RDD[String] = sc.textFile(sourcePath,6000) + val tmp: RDD[String] = sc.textFile(sourcePath, 6000) spark.createDataset(tmp) .map(entry => crossrefElement(entry)) .write.mode(SaveMode.Overwrite).save(targetPath) -// .map(meta => crossrefElement(meta)) -// .toDS.as[CrossrefDT] -// .write.mode(SaveMode.Overwrite).save(targetPath) + // .map(meta => crossrefElement(meta)) + // .toDS.as[CrossrefDT] + // .write.mode(SaveMode.Overwrite).save(targetPath) } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala similarity index 96% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala index c65916610c..fa55b9fb9a 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala @@ -4,10 +4,8 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset} import org.apache.commons.io.IOUtils - import org.apache.spark.SparkConf - -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql._ import org.slf4j.{Logger, LoggerFactory} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala similarity index 88% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala index 95ecb568bd..191c4587ec 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala @@ -2,8 +2,8 @@ package eu.dnetlib.doiboost.crossref import eu.dnetlib.dhp.application.ArgumentApplicationParser import org.apache.hadoop.io.compress.GzipCodec +import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.json4s import org.json4s.DefaultFormats import org.json4s.JsonAST.JArray @@ -17,9 +17,7 @@ object UnpackCrtossrefEntries { val log: Logger = LoggerFactory.getLogger(UnpackCrtossrefEntries.getClass) - - - def extractDump(input:String):List[String] = { + def extractDump(input: String): List[String] = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: json4s.JValue = parse(input) @@ -30,7 +28,6 @@ object UnpackCrtossrefEntries { } - def main(args: Array[String]): Unit = { val conf = new SparkConf val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json")).mkString) @@ -45,7 +42,7 @@ object UnpackCrtossrefEntries { .getOrCreate() val sc: SparkContext = spark.sparkContext - sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2)) + sc.wholeTextFiles(sourcePath, 6000).flatMap(d => extractDump(d._2)) .saveAsTextFile(targetPath, classOf[GzipCodec]) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/MagDataModel.scala similarity index 100% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/MagDataModel.scala index fd96290242..0a6fa00f04 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/MagDataModel.scala @@ -5,10 +5,10 @@ import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory import eu.dnetlib.dhp.schema.oaf.{Instance, Journal, Publication, StructuredProperty} import eu.dnetlib.doiboost.DoiBoostMappingUtil +import eu.dnetlib.doiboost.DoiBoostMappingUtil._ import org.json4s import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse -import eu.dnetlib.doiboost.DoiBoostMappingUtil._ import scala.collection.JavaConverters._ import scala.collection.mutable diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala similarity index 98% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala index a68d0bb2d4..d25a4893f5 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala @@ -3,8 +3,8 @@ package eu.dnetlib.doiboost.mag import eu.dnetlib.dhp.application.ArgumentApplicationParser import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf -import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.types._ +import org.apache.spark.sql.{SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} object SparkImportMagIntoDataset { @@ -24,13 +24,13 @@ object SparkImportMagIntoDataset { "Affiliations" -> Tuple2("mag/Affiliations.txt", Seq("AffiliationId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "GridId:string", "OfficialPage:string", "WikiPage:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "Iso3166Code:string", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")), "AuthorExtendedAttributes" -> Tuple2("mag/AuthorExtendedAttributes.txt", Seq("AuthorId:long", "AttributeType:int", "AttributeValue:string")), "Authors" -> Tuple2("mag/Authors.txt", Seq("AuthorId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "LastKnownAffiliationId:long?", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")), - "ConferenceInstances" -> Tuple2("mag/ConferenceInstances.txt", Seq("ConferenceInstanceId:long", "NormalizedName:string", "DisplayName:string", "ConferenceSeriesId:long", "Location:string", "OfficialUrl:string", "StartDate:DateTime?", "EndDate:DateTime?", "AbstractRegistrationDate:DateTime?", "SubmissionDeadlineDate:DateTime?", "NotificationDueDate:DateTime?", "FinalVersionDueDate:DateTime?", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")), + "ConferenceInstances" -> Tuple2("mag/ConferenceInstances.txt", Seq("ConferenceInstanceId:long", "NormalizedName:string", "DisplayName:string", "ConferenceSeriesId:long", "Location:string", "OfficialUrl:string", "StartDate:DateTime?", "EndDate:DateTime?", "AbstractRegistrationDate:DateTime?", "SubmissionDeadlineDate:DateTime?", "NotificationDueDate:DateTime?", "FinalVersionDueDate:DateTime?", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")), "ConferenceSeries" -> Tuple2("mag/ConferenceSeries.txt", Seq("ConferenceSeriesId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")), "EntityRelatedEntities" -> Tuple2("advanced/EntityRelatedEntities.txt", Seq("EntityId:long", "EntityType:string", "RelatedEntityId:long", "RelatedEntityType:string", "RelatedType:int", "Score:float")), "FieldOfStudyChildren" -> Tuple2("advanced/FieldOfStudyChildren.txt", Seq("FieldOfStudyId:long", "ChildFieldOfStudyId:long")), "FieldOfStudyExtendedAttributes" -> Tuple2("advanced/FieldOfStudyExtendedAttributes.txt", Seq("FieldOfStudyId:long", "AttributeType:int", "AttributeValue:string")), "FieldsOfStudy" -> Tuple2("advanced/FieldsOfStudy.txt", Seq("FieldOfStudyId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "MainType:string", "Level:int", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")), - "Journals" -> Tuple2("mag/Journals.txt", Seq("JournalId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "Issn:string", "Publisher:string", "Webpage:string", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "CreatedDate:DateTime")), + "Journals" -> Tuple2("mag/Journals.txt", Seq("JournalId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "Issn:string", "Publisher:string", "Webpage:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")), "PaperAbstractsInvertedIndex" -> Tuple2("nlp/PaperAbstractsInvertedIndex.txt.*", Seq("PaperId:long", "IndexedAbstract:string")), "PaperAuthorAffiliations" -> Tuple2("mag/PaperAuthorAffiliations.txt", Seq("PaperId:long", "AuthorId:long", "AffiliationId:long?", "AuthorSequenceNumber:uint", "OriginalAuthor:string", "OriginalAffiliation:string")), "PaperCitationContexts" -> Tuple2("nlp/PaperCitationContexts.txt", Seq("PaperId:long", "PaperReferenceId:long", "CitationContext:string")), @@ -75,7 +75,6 @@ object SparkImportMagIntoDataset { .master(parser.get("master")).getOrCreate() - stream.foreach { case (k, v) => val s: StructType = getSchema(k) val df = spark.read diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala similarity index 91% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala index 016279787d..9327254464 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala @@ -5,19 +5,16 @@ import eu.dnetlib.dhp.schema.oaf.Publication import eu.dnetlib.doiboost.DoiBoostMappingUtil import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.functions._ +import org.apache.spark.sql.functions.{col, collect_list, struct} import org.apache.spark.sql._ import org.slf4j.{Logger, LoggerFactory} - import scala.collection.JavaConverters._ - object SparkProcessMAG { - def getDistinctResults (d:Dataset[MagPapers]):Dataset[MagPapers]={ + def getDistinctResults(d: Dataset[MagPapers]): Dataset[MagPapers] = { d.where(col("Doi").isNotNull) .groupByKey(mp => DoiBoostMappingUtil.normalizeDoi(mp.Doi))(Encoders.STRING) - .reduceGroups((p1:MagPapers,p2:MagPapers) => ConversionUtil.choiceLatestMagArtitcle(p1,p2)) + .reduceGroups((p1: MagPapers, p2: MagPapers) => ConversionUtil.choiceLatestMagArtitcle(p1, p2)) .map(_._2)(Encoders.product[MagPapers]) .map(mp => { new MagPapers(mp.PaperId, mp.Rank, DoiBoostMappingUtil.normalizeDoi(mp.Doi), @@ -98,13 +95,13 @@ object SparkProcessMAG { var magPubs: Dataset[(String, Publication)] = spark.read.load(s"$workingPath/merge_step_2").as[Publication] - .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] + .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] val conference = spark.read.load(s"$sourcePath/ConferenceInstances") - .select($"ConferenceInstanceId".as("ci"), $"DisplayName", $"Location", $"StartDate",$"EndDate" ) + .select($"ConferenceInstanceId".as("ci"), $"DisplayName", $"Location", $"StartDate", $"EndDate") val conferenceInstance = conference.joinWith(papers, papers("ConferenceInstanceId").equalTo(conference("ci"))) - .select($"_1.ci", $"_1.DisplayName", $"_1.Location", $"_1.StartDate",$"_1.EndDate", $"_2.PaperId").as[MagConferenceInstance] + .select($"_1.ci", $"_1.DisplayName", $"_1.Location", $"_1.StartDate", $"_1.EndDate", $"_2.PaperId").as[MagConferenceInstance] magPubs.joinWith(conferenceInstance, col("_1").equalTo(conferenceInstance("PaperId")), "left") @@ -122,7 +119,7 @@ object SparkProcessMAG { magPubs.joinWith(paperAbstract, col("_1").equalTo(paperAbstract("PaperId")), "left") .map(item => ConversionUtil.updatePubsWithDescription(item) - ).write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_4") + ).write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_4") logger.info("Phase 7) Enrich Publication with FieldOfStudy") @@ -148,11 +145,10 @@ object SparkProcessMAG { spark.read.load(s"$workingPath/mag_publication").as[Publication] .filter(p => p.getId == null) .groupByKey(p => p.getId) - .reduceGroups((a:Publication, b:Publication) => ConversionUtil.mergePublication(a,b)) + .reduceGroups((a: Publication, b: Publication) => ConversionUtil.mergePublication(a, b)) .map(_._2) .write.mode(SaveMode.Overwrite).save(s"$targetPath/magPublication") - } } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala similarity index 98% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala index 1cd3f70283..11031f9ca4 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala @@ -4,17 +4,16 @@ import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Publication} -import eu.dnetlib.dhp.schema.orcid.{AuthorData, OrcidDOI} import eu.dnetlib.doiboost.DoiBoostMappingUtil import eu.dnetlib.doiboost.DoiBoostMappingUtil.{createSP, generateDataInfo} import org.apache.commons.lang.StringUtils -import org.slf4j.{Logger, LoggerFactory} - -import scala.collection.JavaConverters._ import org.json4s import org.json4s.DefaultFormats import org.json4s.JsonAST._ import org.json4s.jackson.JsonMethods._ +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConverters._ case class ORCIDItem(doi:String, authors:List[OrcidAuthor]){} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala similarity index 84% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala index fa4a93e008..1b189e2969 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala @@ -11,10 +11,10 @@ object SparkConvertORCIDToOAF { val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass) - def run(spark:SparkSession, workingPath:String, targetPath:String) :Unit = { + def run(spark: SparkSession, workingPath: String, targetPath: String): Unit = { implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] import spark.implicits._ - val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem] + val dataset: Dataset[ORCIDItem] = spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem] logger.info("Converting ORCID to OAF") dataset.map(o => ORCIDToOAF.convertTOOAF(o)).write.mode(SaveMode.Overwrite).save(targetPath) @@ -35,8 +35,8 @@ object SparkConvertORCIDToOAF { val workingPath = parser.get("workingPath") val targetPath = parser.get("targetPath") - run(spark,workingPath, targetPath) + run(spark, workingPath, targetPath) } -} \ No newline at end of file +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala similarity index 67% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala index 31f3319122..153be5dd13 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala @@ -1,48 +1,45 @@ package eu.dnetlib.doiboost.orcid -import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.oa.merge.AuthorMerger import eu.dnetlib.dhp.schema.oaf.Publication -import eu.dnetlib.dhp.schema.orcid.OrcidDOI import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql.functions.{col, collect_list} +import org.apache.spark.sql._ import org.slf4j.{Logger, LoggerFactory} object SparkPreprocessORCID { val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass) - def fixORCIDItem(item :ORCIDItem):ORCIDItem = { - ORCIDItem(item.doi, item.authors.groupBy(_.oid).map(_._2.head).toList) + def fixORCIDItem(item: ORCIDItem): ORCIDItem = { + ORCIDItem(item.doi, item.authors.groupBy(_.oid).map(_._2.head).toList) } - def run(spark:SparkSession,sourcePath:String,workingPath:String):Unit = { + def run(spark: SparkSession, sourcePath: String, workingPath: String): Unit = { import spark.implicits._ implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] - val inputRDD:RDD[OrcidAuthor] = spark.sparkContext.textFile(s"$sourcePath/authors").map(s => ORCIDToOAF.convertORCIDAuthor(s)).filter(s => s!= null).filter(s => ORCIDToOAF.authorValid(s)) + val inputRDD: RDD[OrcidAuthor] = spark.sparkContext.textFile(s"$sourcePath/authors").map(s => ORCIDToOAF.convertORCIDAuthor(s)).filter(s => s != null).filter(s => ORCIDToOAF.authorValid(s)) spark.createDataset(inputRDD).as[OrcidAuthor].write.mode(SaveMode.Overwrite).save(s"$workingPath/author") - val res = spark.sparkContext.textFile(s"$sourcePath/works").flatMap(s => ORCIDToOAF.extractDOIWorks(s)).filter(s => s!= null) + val res = spark.sparkContext.textFile(s"$sourcePath/works").flatMap(s => ORCIDToOAF.extractDOIWorks(s)).filter(s => s != null) spark.createDataset(res).as[OrcidWork].write.mode(SaveMode.Overwrite).save(s"$workingPath/works") - val authors :Dataset[OrcidAuthor] = spark.read.load(s"$workingPath/author").as[OrcidAuthor] + val authors: Dataset[OrcidAuthor] = spark.read.load(s"$workingPath/author").as[OrcidAuthor] - val works :Dataset[OrcidWork] = spark.read.load(s"$workingPath/works").as[OrcidWork] + val works: Dataset[OrcidWork] = spark.read.load(s"$workingPath/works").as[OrcidWork] works.joinWith(authors, authors("oid").equalTo(works("oid"))) - .map(i =>{ + .map(i => { val doi = i._1.doi val author = i._2 - (doi, author) - }).groupBy(col("_1").alias("doi")) + (doi, author) + }).groupBy(col("_1").alias("doi")) .agg(collect_list(col("_2")).alias("authors")).as[ORCIDItem] .map(s => fixORCIDItem(s)) .write.mode(SaveMode.Overwrite).save(s"$workingPath/orcidworksWithAuthor") @@ -67,4 +64,4 @@ object SparkPreprocessORCID { } -} \ No newline at end of file +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala similarity index 80% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala index 4530926f10..70290018d9 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala @@ -1,16 +1,14 @@ package eu.dnetlib.doiboost.uw import eu.dnetlib.dhp.application.ArgumentApplicationParser - import eu.dnetlib.dhp.schema.oaf.Publication import eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql._ import org.slf4j.{Logger, LoggerFactory} - object SparkMapUnpayWallToOAF { def main(args: Array[String]): Unit = { @@ -32,11 +30,11 @@ object SparkMapUnpayWallToOAF { val sourcePath = parser.get("sourcePath") val targetPath = parser.get("targetPath") - val inputRDD:RDD[String] = spark.sparkContext.textFile(s"$sourcePath") + val inputRDD: RDD[String] = spark.sparkContext.textFile(s"$sourcePath") logger.info("Converting UnpayWall to OAF") - val d:Dataset[Publication] = spark.createDataset(inputRDD.map(UnpayWallToOAF.convertToOAF).filter(p=>p!=null)).as[Publication] + val d: Dataset[Publication] = spark.createDataset(inputRDD.map(UnpayWallToOAF.convertToOAF).filter(p => p != null)).as[Publication] d.write.mode(SaveMode.Overwrite).save(targetPath) } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala similarity index 98% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala index c8324cde16..bf56949658 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala @@ -4,14 +4,13 @@ import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory import eu.dnetlib.dhp.schema.oaf.{AccessRight, Instance, OpenAccessRoute, Publication} import eu.dnetlib.doiboost.DoiBoostMappingUtil +import eu.dnetlib.doiboost.DoiBoostMappingUtil._ import org.json4s import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ -import eu.dnetlib.doiboost.DoiBoostMappingUtil._ -import eu.dnetlib.doiboost.uw.UnpayWallToOAF.get_unpaywall_color diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/DoiBoostHostedByMapTest.scala b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/doiboost/DoiBoostHostedByMapTest.scala similarity index 98% rename from dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/DoiBoostHostedByMapTest.scala rename to dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/doiboost/DoiBoostHostedByMapTest.scala index 4912648bea..049ac37f47 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/DoiBoostHostedByMapTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/doiboost/DoiBoostHostedByMapTest.scala @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.doiboost +package eu.dnetlib.doiboost import eu.dnetlib.dhp.schema.oaf.{Publication, Dataset => OafDataset} import eu.dnetlib.doiboost.{DoiBoostMappingUtil, HostedByItemType} diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/NormalizeDoiTest.scala b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/doiboost/NormalizeDoiTest.scala similarity index 96% rename from dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/NormalizeDoiTest.scala rename to dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/doiboost/NormalizeDoiTest.scala index a9a841ee91..bdf845f190 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/NormalizeDoiTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/doiboost/NormalizeDoiTest.scala @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.doiboost +package eu.dnetlib.doiboost import eu.dnetlib.doiboost.DoiBoostMappingUtil import org.junit.jupiter.api.Test diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala similarity index 100% rename from dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala rename to dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/doiboost/mag/MAGMappingTest.scala similarity index 100% rename from dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala rename to dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/doiboost/mag/MAGMappingTest.scala index 46d4ec08d6..7403e103e4 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/doiboost/mag/MAGMappingTest.scala @@ -3,9 +3,9 @@ package eu.dnetlib.doiboost.mag import org.apache.spark.SparkConf import org.apache.spark.sql.{Dataset, SparkSession} import org.codehaus.jackson.map.ObjectMapper +import org.json4s.DefaultFormats import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test -import org.json4s.DefaultFormats import org.slf4j.{Logger, LoggerFactory} import java.sql.Timestamp diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala similarity index 99% rename from dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala rename to dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala index b484dc0878..a5ce6296cf 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala @@ -10,9 +10,8 @@ import org.junit.jupiter.api.io.TempDir import org.slf4j.{Logger, LoggerFactory} import java.nio.file.Path -import scala.io.Source - import scala.collection.JavaConversions._ +import scala.io.Source class MappingORCIDToOAFTest { val logger: Logger = LoggerFactory.getLogger(ORCIDToOAF.getClass) diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/uw/UnpayWallMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/doiboost/uw/UnpayWallMappingTest.scala similarity index 100% rename from dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/uw/UnpayWallMappingTest.scala rename to dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/doiboost/uw/UnpayWallMappingTest.scala index fa696fffc7..012ed3da09 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/uw/UnpayWallMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/doiboost/uw/UnpayWallMappingTest.scala @@ -3,11 +3,11 @@ package eu.dnetlib.doiboost.uw import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.schema.oaf.OpenAccessRoute +import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test +import org.slf4j.{Logger, LoggerFactory} import scala.io.Source -import org.junit.jupiter.api.Assertions._ -import org.slf4j.{Logger, LoggerFactory} class UnpayWallMappingTest {