From e9f285ec4d0a8fc22fc19cfe5fdb916fafbec694 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 6 Dec 2021 14:24:03 +0100 Subject: [PATCH] [scala-refactor] Module dhp-doiboost: Moved all scala source into src/main/scala and src/test/scala --- .../doiboost/DoiBoostMappingUtil.scala | 14 ++- .../SparkGenerateDOIBoostActionSet.scala | 45 ++++------ .../doiboost/SparkGenerateDoiBoost.scala | 80 ++++++++--------- .../doiboost/crossref/Crossref2Oaf.scala | 9 +- .../doiboost/crossref/CrossrefDataset.scala | 25 +++--- .../crossref/GenerateCrossrefDataset.scala | 20 ++--- .../crossref/SparkMapDumpIntoOAF.scala | 4 +- .../crossref/UnpackCrtossrefEntries.scala | 9 +- .../dnetlib/doiboost/mag/MagDataModel.scala | 2 +- .../mag/SparkImportMagIntoDataset.scala | 7 +- .../doiboost/mag/SparkProcessMAG.scala | 22 ++--- .../dnetlib/doiboost/orcid/ORCIDToOAF.scala | 7 +- .../orcid/SparkConvertORCIDToOAF.scala | 8 +- .../doiboost/orcid/SparkPreprocessORCID.scala | 29 +++---- .../doiboost/uw/SparkMapUnpayWallToOAF.scala | 8 +- .../dnetlib/doiboost/uw/UnpayWallToOAF.scala | 3 +- .../doiboost/DoiBoostHostedByMapTest.scala | 70 --------------- .../doiboost/DoiBoostHostedByMapTest.scala | 20 +++++ .../dhp/doiboost/NormalizeDoiTest.scala | 0 .../crossref/CrossrefMappingTest.scala | 86 +++++-------------- .../dhp}/doiboost/mag/MAGMappingTest.scala | 11 +-- .../orcid/MappingORCIDToOAFTest.scala | 8 +- .../doiboost/uw/UnpayWallMappingTest.scala | 10 +-- 23 files changed, 181 insertions(+), 316 deletions(-) rename dhp-workflows/dhp-doiboost/src/main/{java => scala}/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala (97%) rename dhp-workflows/dhp-doiboost/src/main/{java => scala}/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala (63%) rename dhp-workflows/dhp-doiboost/src/main/{java => scala}/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala (76%) rename dhp-workflows/dhp-doiboost/src/main/{java => scala}/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala (99%) rename dhp-workflows/dhp-doiboost/src/main/{java => scala}/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala (77%) rename dhp-workflows/dhp-doiboost/src/main/{java => scala}/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala (73%) rename dhp-workflows/dhp-doiboost/src/main/{java => scala}/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala (96%) rename dhp-workflows/dhp-doiboost/src/main/{java => scala}/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala (88%) rename dhp-workflows/dhp-doiboost/src/main/{java => scala}/eu/dnetlib/doiboost/mag/MagDataModel.scala (100%) rename dhp-workflows/dhp-doiboost/src/main/{java => scala}/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala (98%) rename dhp-workflows/dhp-doiboost/src/main/{java => scala}/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala (90%) rename dhp-workflows/dhp-doiboost/src/main/{java => scala}/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala (98%) rename dhp-workflows/dhp-doiboost/src/main/{java => scala}/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala (84%) rename dhp-workflows/dhp-doiboost/src/main/{java => scala}/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala (67%) rename dhp-workflows/dhp-doiboost/src/main/{java => scala}/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala (80%) rename dhp-workflows/dhp-doiboost/src/main/{java => scala}/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala (98%) delete mode 100644 dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/DoiBoostHostedByMapTest.scala create mode 100644 dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/DoiBoostHostedByMapTest.scala rename dhp-workflows/dhp-doiboost/src/test/{java => scala}/eu/dnetlib/dhp/doiboost/NormalizeDoiTest.scala (100%) rename dhp-workflows/dhp-doiboost/src/test/{java/eu/dnetlib => scala/eu/dnetlib/dhp}/doiboost/crossref/CrossrefMappingTest.scala (88%) rename dhp-workflows/dhp-doiboost/src/test/{java/eu/dnetlib => scala/eu/dnetlib/dhp}/doiboost/mag/MAGMappingTest.scala (88%) rename dhp-workflows/dhp-doiboost/src/test/{java/eu/dnetlib => scala/eu/dnetlib/dhp}/doiboost/orcid/MappingORCIDToOAFTest.scala (95%) rename dhp-workflows/dhp-doiboost/src/test/{java/eu/dnetlib => scala/eu/dnetlib/dhp}/doiboost/uw/UnpayWallMappingTest.scala (91%) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala similarity index 97% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala index efd5d2497..3822f40b5 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala @@ -1,21 +1,19 @@ package eu.dnetlib.doiboost -import java.time.LocalDate -import java.time.format.DateTimeFormatter - +import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.schema.action.AtomicAction -import eu.dnetlib.dhp.schema.oaf.{AccessRight, DataInfo, Dataset, Field, Instance, KeyValue, Oaf, OpenAccessRoute, Organization, Publication, Qualifier, Relation, Result, StructuredProperty} +import eu.dnetlib.dhp.schema.common.ModelConstants +import eu.dnetlib.dhp.schema.oaf._ +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils import eu.dnetlib.dhp.utils.DHPUtils import org.apache.commons.lang3.StringUtils -import com.fasterxml.jackson.databind.ObjectMapper -import eu.dnetlib.dhp.schema.common.ModelConstants -import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils -import eu.dnetlib.doiboost.DoiBoostMappingUtil.{getClosedAccessQualifier, getEmbargoedAccessQualifier, getUnknownQualifier} import org.json4s import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse import org.slf4j.{Logger, LoggerFactory} +import java.time.LocalDate +import java.time.format.DateTimeFormatter import scala.collection.JavaConverters._ diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala similarity index 63% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala index 3bfca0859..f13900abe 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala @@ -8,11 +8,12 @@ import org.apache.hadoop.io.Text import org.apache.hadoop.io.compress.GzipCodec import org.apache.hadoop.mapred.SequenceFileOutputFormat import org.apache.spark.SparkConf -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} import org.slf4j.{Logger, LoggerFactory} object SparkGenerateDOIBoostActionSet { val logger: Logger = LoggerFactory.getLogger(getClass) + def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() @@ -33,53 +34,41 @@ object SparkGenerateDOIBoostActionSet { implicit val mapEncoderAtomiAction: Encoder[AtomicAction[OafDataset]] = Encoders.kryo[AtomicAction[OafDataset]] - val dbPublicationPath = parser.get("dbPublicationPath") - val dbDatasetPath = parser.get("dbDatasetPath") - val crossRefRelation = parser.get("crossRefRelation") - val dbaffiliationRelationPath = parser.get("dbaffiliationRelationPath") - val dbOrganizationPath = parser.get("dbOrganizationPath") - val sequenceFilePath = parser.get("sFilePath") + val dbPublicationPath = parser.get("dbPublicationPath") + val dbDatasetPath = parser.get("dbDatasetPath") + val crossRefRelation = parser.get("crossRefRelation") + val dbaffiliationRelationPath = parser.get("dbaffiliationRelationPath") + val dbOrganizationPath = parser.get("dbOrganizationPath") + val sequenceFilePath = parser.get("sFilePath") val asDataset = spark.read.load(dbDatasetPath).as[OafDataset] .filter(p => p != null || p.getId != null) - .map(d =>DoiBoostMappingUtil.fixResult(d)) - .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) + .map(d => DoiBoostMappingUtil.fixResult(d)) + .map(d => DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) - val asPublication =spark.read.load(dbPublicationPath).as[Publication] + val asPublication = spark.read.load(dbPublicationPath).as[Publication] .filter(p => p != null || p.getId != null) - .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) + .map(d => DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) val asOrganization = spark.read.load(dbOrganizationPath).as[Organization] - .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) - + .map(d => DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) val asCRelation = spark.read.load(crossRefRelation).as[Relation] - .filter(r => r!= null && r.getSource != null && r.getTarget != null) - .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) + .filter(r => r != null && r.getSource != null && r.getTarget != null) + .map(d => DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) val asRelAffiliation = spark.read.load(dbaffiliationRelationPath).as[Relation] - .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) - - - + .map(d => DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) val d: Dataset[(String, String)] = asDataset.union(asPublication).union(asOrganization).union(asCRelation).union(asRelAffiliation) - - d.rdd.repartition(6000).map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$sequenceFilePath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec]) - - - - - - - + d.rdd.repartition(6000).map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$sequenceFilePath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text, Text]], classOf[GzipCodec]) } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala similarity index 76% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala index 501073e74..91fe56cba 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala @@ -9,28 +9,26 @@ import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.functions.col -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} -import org.slf4j.{Logger, LoggerFactory} - -import scala.collection.JavaConverters._ +import org.apache.spark.sql._ import org.json4s.DefaultFormats -import org.json4s.JsonAST.{JField, JObject, JString,JArray} +import org.json4s.JsonAST.{JField, JObject, JString} import org.json4s.jackson.JsonMethods.parse - +import org.slf4j.{Logger, LoggerFactory} +import scala.collection.JavaConverters._ object SparkGenerateDoiBoost { - def extractIdGRID(input:String):List[(String,String)] = { + def extractIdGRID(input: String): List[(String, String)] = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: org.json4s.JValue = parse(input) - val id:String = (json \ "id").extract[String] + val id: String = (json \ "id").extract[String] - val grids:List[String] = for { + val grids: List[String] = for { JObject(pid) <- json \ "pid" JField("qualifier", JObject(qualifier)) <- pid - JField("classid", JString(classid)) <-qualifier + JField("classid", JString(classid)) <- qualifier JField("value", JString(vl)) <- pid if classid == "GRID" } yield vl @@ -38,7 +36,6 @@ object SparkGenerateDoiBoost { } - def main(args: Array[String]): Unit = { val logger: Logger = LoggerFactory.getLogger(getClass) @@ -73,7 +70,7 @@ object SparkGenerateDoiBoost { if (a != null && a._2 != null) { b.mergeFrom(a._2) b.setId(a._1) - val authors =AuthorMerger.mergeAuthor(b.getAuthor, a._2.getAuthor) + val authors = AuthorMerger.mergeAuthor(b.getAuthor, a._2.getAuthor) b.setAuthor(authors) return b } @@ -87,11 +84,11 @@ object SparkGenerateDoiBoost { return b2 } else { - if (b2 != null ) { + if (b2 != null) { b1.mergeFrom(b2) - val authors =AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor) + val authors = AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor) b1.setAuthor(authors) - if (b2.getId!= null && b2.getId.nonEmpty) + if (b2.getId != null && b2.getId.nonEmpty) b1.setId(b2.getId) return b1 } @@ -118,10 +115,9 @@ object SparkGenerateDoiBoost { val crossrefPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/crossrefPublication").as[Publication].map(p => (p.getId, p)) val uwPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/uwPublication").as[Publication].map(p => (p.getId, p)) - def applyMerge(item:((String, Publication), (String, Publication))) : Publication = - { + def applyMerge(item: ((String, Publication), (String, Publication))): Publication = { val crossrefPub = item._1._2 - if (item._2!= null) { + if (item._2 != null) { val otherPub = item._2._2 if (otherPub != null) { crossrefPub.mergeFrom(otherPub) @@ -130,6 +126,7 @@ object SparkGenerateDoiBoost { } crossrefPub } + crossrefPublication.joinWith(uwPublication, crossrefPublication("_1").equalTo(uwPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/firstJoin") logger.info("Phase 3) Join Result with ORCID") val fj: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/firstJoin").as[Publication].map(p => (p.getId, p)) @@ -143,9 +140,9 @@ object SparkGenerateDoiBoost { sj.joinWith(magPublication, sj("_1").equalTo(magPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublication") - val doiBoostPublication: Dataset[(String,Publication)] = spark.read.load(s"$workingDirPath/doiBoostPublication").as[Publication].filter(p=>DoiBoostMappingUtil.filterPublication(p)).map(DoiBoostMappingUtil.toISSNPair)(tupleForJoinEncoder) + val doiBoostPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/doiBoostPublication").as[Publication].filter(p => DoiBoostMappingUtil.filterPublication(p)).map(DoiBoostMappingUtil.toISSNPair)(tupleForJoinEncoder) - val hostedByDataset : Dataset[(String, HostedByItemType)] = spark.createDataset(spark.sparkContext.textFile(hostedByMapPath).map(DoiBoostMappingUtil.toHostedByItem)) + val hostedByDataset: Dataset[(String, HostedByItemType)] = spark.createDataset(spark.sparkContext.textFile(hostedByMapPath).map(DoiBoostMappingUtil.toHostedByItem)) doiBoostPublication.joinWith(hostedByDataset, doiBoostPublication("_1").equalTo(hostedByDataset("_1")), "left") @@ -164,21 +161,20 @@ object SparkGenerateDoiBoost { val paperAffiliation = spark.read.load(paperAffiliationPath).select(col("AffiliationId").alias("affId"), col("PaperId")) - val a:Dataset[DoiBoostAffiliation] = paperAffiliation + val a: Dataset[DoiBoostAffiliation] = paperAffiliation .joinWith(affiliation, paperAffiliation("affId").equalTo(affiliation("AffiliationId"))) .select(col("_1.PaperId"), col("_2.AffiliationId"), col("_2.GridId"), col("_2.OfficialPage"), col("_2.DisplayName")).as[DoiBoostAffiliation] - - val magPubs:Dataset[(String,Publication)]= spark.read.load(s"$workingDirPath/doiBoostPublicationFiltered").as[Publication] - .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p))(tupleForJoinEncoder).filter(s =>s._1!= null ) + val magPubs: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/doiBoostPublicationFiltered").as[Publication] + .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p))(tupleForJoinEncoder).filter(s => s._1 != null) - magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).flatMap(item => { - val pub:Publication = item._1._2 + magPubs.joinWith(a, magPubs("_1").equalTo(a("PaperId"))).flatMap(item => { + val pub: Publication = item._1._2 val affiliation = item._2 - val affId:String = if (affiliation.GridId.isDefined) s"unresolved::grid::${affiliation.GridId.get.toLowerCase}" else DoiBoostMappingUtil.generateMAGAffiliationId(affiliation.AffiliationId.toString) - val r:Relation = new Relation + val affId: String = if (affiliation.GridId.isDefined) s"unresolved::grid::${affiliation.GridId.get.toLowerCase}" else DoiBoostMappingUtil.generateMAGAffiliationId(affiliation.AffiliationId.toString) + val r: Relation = new Relation r.setSource(pub.getId) r.setTarget(affId) r.setRelType(ModelConstants.RESULT_ORGANIZATION) @@ -186,7 +182,7 @@ object SparkGenerateDoiBoost { r.setSubRelType(ModelConstants.AFFILIATION) r.setDataInfo(pub.getDataInfo) r.setCollectedfrom(List(DoiBoostMappingUtil.createMAGCollectedFrom()).asJava) - val r1:Relation = new Relation + val r1: Relation = new Relation r1.setTarget(pub.getId) r1.setSource(affId) r1.setRelType(ModelConstants.RESULT_ORGANIZATION) @@ -198,33 +194,31 @@ object SparkGenerateDoiBoost { })(mapEncoderRel).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation_unresolved") - - - val unresolvedRels:Dataset[(String, Relation)] = spark.read.load(s"$workingDirPath/doiBoostPublicationAffiliation_unresolved").as[Relation].map(r => { + val unresolvedRels: Dataset[(String, Relation)] = spark.read.load(s"$workingDirPath/doiBoostPublicationAffiliation_unresolved").as[Relation].map(r => { if (r.getSource.startsWith("unresolved")) (r.getSource, r) else if (r.getTarget.startsWith("unresolved")) - (r.getTarget,r) - else + (r.getTarget, r) + else ("resolved", r) })(Encoders.tuple(Encoders.STRING, mapEncoderRel)) - val openaireOrganization:Dataset[(String,String)] = spark.read.text(openaireOrganizationPath).as[String].flatMap(s => extractIdGRID(s)).groupByKey(_._2).reduceGroups((x,y) => if (x != null) x else y ).map(_._2) + val openaireOrganization: Dataset[(String, String)] = spark.read.text(openaireOrganizationPath).as[String].flatMap(s => extractIdGRID(s)).groupByKey(_._2).reduceGroups((x, y) => if (x != null) x else y).map(_._2) - unresolvedRels.joinWith(openaireOrganization,unresolvedRels("_1").equalTo(openaireOrganization("_2"))) + unresolvedRels.joinWith(openaireOrganization, unresolvedRels("_1").equalTo(openaireOrganization("_2"))) .map { x => val currentRels = x._1._2 val currentOrgs = x._2 - if (currentOrgs!= null) - if(currentRels.getSource.startsWith("unresolved")) + if (currentOrgs != null) + if (currentRels.getSource.startsWith("unresolved")) currentRels.setSource(currentOrgs._1) else currentRels.setTarget(currentOrgs._1) - currentRels - }.filter(r=> !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved")).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation") + currentRels + }.filter(r => !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved")).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation") - magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).map( item => { + magPubs.joinWith(a, magPubs("_1").equalTo(a("PaperId"))).map(item => { val affiliation = item._2 if (affiliation.GridId.isEmpty) { val o = new Organization @@ -241,7 +235,7 @@ object SparkGenerateDoiBoost { } else null - }).filter(o=> o!=null).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostOrganization") - } + }).filter(o => o != null).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostOrganization") + } } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala similarity index 99% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala index 1b1c850ba..edca4a180 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala @@ -4,20 +4,19 @@ import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf._ import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, OafMapperUtils} import eu.dnetlib.dhp.utils.DHPUtils -import eu.dnetlib.doiboost.DoiBoostMappingUtil.{decideAccessRight, _} +import eu.dnetlib.doiboost.DoiBoostMappingUtil +import eu.dnetlib.doiboost.DoiBoostMappingUtil._ import org.apache.commons.lang.StringUtils import org.json4s import org.json4s.DefaultFormats -import org.json4s.JsonAST.{JValue, _} +import org.json4s.JsonAST._ import org.json4s.jackson.JsonMethods._ import org.slf4j.{Logger, LoggerFactory} +import java.util import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.matching.Regex -import java.util - -import eu.dnetlib.doiboost.DoiBoostMappingUtil case class CrossrefDT(doi: String, json:String, timestamp: Long) {} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala similarity index 77% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala index 159b817c7..6a1c701af 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala @@ -6,7 +6,7 @@ import org.apache.commons.io.IOUtils import org.apache.hadoop.io.{IntWritable, Text} import org.apache.spark.SparkConf import org.apache.spark.sql.expressions.Aggregator -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql.{Dataset, Encoder, SaveMode, SparkSession} import org.json4s import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse @@ -17,12 +17,12 @@ object CrossrefDataset { val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass) - def to_item(input:String):CrossrefDT = { + def to_item(input: String): CrossrefDT = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: json4s.JValue = parse(input) - val ts:Long = (json \ "indexed" \ "timestamp").extract[Long] - val doi:String = DoiBoostMappingUtil.normalizeDoi((json \ "DOI").extract[String]) + val ts: Long = (json \ "indexed" \ "timestamp").extract[Long] + val doi: String = DoiBoostMappingUtil.normalizeDoi((json \ "DOI").extract[String]) CrossrefDT(doi, input, ts) } @@ -30,7 +30,6 @@ object CrossrefDataset { def main(args: Array[String]): Unit = { - val conf: SparkConf = new SparkConf() val parser = new ArgumentApplicationParser(IOUtils.toString(CrossrefDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json"))) parser.parseArgument(args) @@ -54,7 +53,7 @@ object CrossrefDataset { return b - if(a.timestamp >b.timestamp) { + if (a.timestamp > b.timestamp) { return a } b @@ -66,7 +65,7 @@ object CrossrefDataset { if (a == null) return b - if(a.timestamp >b.timestamp) { + if (a.timestamp > b.timestamp) { return a } b @@ -79,20 +78,20 @@ object CrossrefDataset { override def finish(reduction: CrossrefDT): CrossrefDT = reduction } - val workingPath:String = parser.get("workingPath") + val workingPath: String = parser.get("workingPath") - val main_ds:Dataset[CrossrefDT] = spark.read.load(s"$workingPath/crossref_ds").as[CrossrefDT] + val main_ds: Dataset[CrossrefDT] = spark.read.load(s"$workingPath/crossref_ds").as[CrossrefDT] val update = - spark.createDataset(spark.sparkContext.sequenceFile(s"$workingPath/index_update", classOf[IntWritable], classOf[Text]) - .map(i =>CrossrefImporter.decompressBlob(i._2.toString)) - .map(i =>to_item(i))) + spark.createDataset(spark.sparkContext.sequenceFile(s"$workingPath/index_update", classOf[IntWritable], classOf[Text]) + .map(i => CrossrefImporter.decompressBlob(i._2.toString)) + .map(i => to_item(i))) main_ds.union(update).groupByKey(_.doi) .agg(crossrefAggregator.toColumn) - .map(s=>s._2) + .map(s => s._2) .write.mode(SaveMode.Overwrite).save(s"$workingPath/crossref_ds_updated") } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala similarity index 73% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala index 526ff7b3a..6d03abc25 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala @@ -2,17 +2,12 @@ package eu.dnetlib.doiboost.crossref import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.doiboost.DoiBoostMappingUtil -import eu.dnetlib.doiboost.crossref.CrossrefDataset.to_item -import eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries.getClass -import org.apache.hadoop.io.{IntWritable, Text} -import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.rdd.RDD -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.{SparkConf, SparkContext} import org.json4s import org.json4s.DefaultFormats -import org.json4s.JsonAST.JArray -import org.json4s.jackson.JsonMethods.{compact, parse, render} +import org.json4s.jackson.JsonMethods.parse import org.slf4j.{Logger, LoggerFactory} import scala.io.Source @@ -24,11 +19,10 @@ object GenerateCrossrefDataset { implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT] - def crossrefElement(meta: String): CrossrefDT = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: json4s.JValue = parse(meta) - val doi:String = DoiBoostMappingUtil.normalizeDoi((json \ "DOI").extract[String]) + val doi: String = DoiBoostMappingUtil.normalizeDoi((json \ "DOI").extract[String]) val timestamp: Long = (json \ "indexed" \ "timestamp").extract[Long] CrossrefDT(doi, meta, timestamp) @@ -51,14 +45,14 @@ object GenerateCrossrefDataset { import spark.implicits._ - val tmp : RDD[String] = sc.textFile(sourcePath,6000) + val tmp: RDD[String] = sc.textFile(sourcePath, 6000) spark.createDataset(tmp) .map(entry => crossrefElement(entry)) .write.mode(SaveMode.Overwrite).save(targetPath) -// .map(meta => crossrefElement(meta)) -// .toDS.as[CrossrefDT] -// .write.mode(SaveMode.Overwrite).save(targetPath) + // .map(meta => crossrefElement(meta)) + // .toDS.as[CrossrefDT] + // .write.mode(SaveMode.Overwrite).save(targetPath) } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala similarity index 96% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala index c65916610..fa55b9fb9 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala @@ -4,10 +4,8 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset} import org.apache.commons.io.IOUtils - import org.apache.spark.SparkConf - -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql._ import org.slf4j.{Logger, LoggerFactory} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala similarity index 88% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala index 95ecb568b..191c4587e 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala @@ -2,8 +2,8 @@ package eu.dnetlib.doiboost.crossref import eu.dnetlib.dhp.application.ArgumentApplicationParser import org.apache.hadoop.io.compress.GzipCodec +import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.json4s import org.json4s.DefaultFormats import org.json4s.JsonAST.JArray @@ -17,9 +17,7 @@ object UnpackCrtossrefEntries { val log: Logger = LoggerFactory.getLogger(UnpackCrtossrefEntries.getClass) - - - def extractDump(input:String):List[String] = { + def extractDump(input: String): List[String] = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: json4s.JValue = parse(input) @@ -30,7 +28,6 @@ object UnpackCrtossrefEntries { } - def main(args: Array[String]): Unit = { val conf = new SparkConf val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json")).mkString) @@ -45,7 +42,7 @@ object UnpackCrtossrefEntries { .getOrCreate() val sc: SparkContext = spark.sparkContext - sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2)) + sc.wholeTextFiles(sourcePath, 6000).flatMap(d => extractDump(d._2)) .saveAsTextFile(targetPath, classOf[GzipCodec]) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/MagDataModel.scala similarity index 100% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/MagDataModel.scala index fd9629024..0a6fa00f0 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/MagDataModel.scala @@ -5,10 +5,10 @@ import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory import eu.dnetlib.dhp.schema.oaf.{Instance, Journal, Publication, StructuredProperty} import eu.dnetlib.doiboost.DoiBoostMappingUtil +import eu.dnetlib.doiboost.DoiBoostMappingUtil._ import org.json4s import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse -import eu.dnetlib.doiboost.DoiBoostMappingUtil._ import scala.collection.JavaConverters._ import scala.collection.mutable diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala similarity index 98% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala index a68d0bb2d..d25a4893f 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala @@ -3,8 +3,8 @@ package eu.dnetlib.doiboost.mag import eu.dnetlib.dhp.application.ArgumentApplicationParser import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf -import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.types._ +import org.apache.spark.sql.{SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} object SparkImportMagIntoDataset { @@ -24,13 +24,13 @@ object SparkImportMagIntoDataset { "Affiliations" -> Tuple2("mag/Affiliations.txt", Seq("AffiliationId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "GridId:string", "OfficialPage:string", "WikiPage:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "Iso3166Code:string", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")), "AuthorExtendedAttributes" -> Tuple2("mag/AuthorExtendedAttributes.txt", Seq("AuthorId:long", "AttributeType:int", "AttributeValue:string")), "Authors" -> Tuple2("mag/Authors.txt", Seq("AuthorId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "LastKnownAffiliationId:long?", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")), - "ConferenceInstances" -> Tuple2("mag/ConferenceInstances.txt", Seq("ConferenceInstanceId:long", "NormalizedName:string", "DisplayName:string", "ConferenceSeriesId:long", "Location:string", "OfficialUrl:string", "StartDate:DateTime?", "EndDate:DateTime?", "AbstractRegistrationDate:DateTime?", "SubmissionDeadlineDate:DateTime?", "NotificationDueDate:DateTime?", "FinalVersionDueDate:DateTime?", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")), + "ConferenceInstances" -> Tuple2("mag/ConferenceInstances.txt", Seq("ConferenceInstanceId:long", "NormalizedName:string", "DisplayName:string", "ConferenceSeriesId:long", "Location:string", "OfficialUrl:string", "StartDate:DateTime?", "EndDate:DateTime?", "AbstractRegistrationDate:DateTime?", "SubmissionDeadlineDate:DateTime?", "NotificationDueDate:DateTime?", "FinalVersionDueDate:DateTime?", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")), "ConferenceSeries" -> Tuple2("mag/ConferenceSeries.txt", Seq("ConferenceSeriesId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")), "EntityRelatedEntities" -> Tuple2("advanced/EntityRelatedEntities.txt", Seq("EntityId:long", "EntityType:string", "RelatedEntityId:long", "RelatedEntityType:string", "RelatedType:int", "Score:float")), "FieldOfStudyChildren" -> Tuple2("advanced/FieldOfStudyChildren.txt", Seq("FieldOfStudyId:long", "ChildFieldOfStudyId:long")), "FieldOfStudyExtendedAttributes" -> Tuple2("advanced/FieldOfStudyExtendedAttributes.txt", Seq("FieldOfStudyId:long", "AttributeType:int", "AttributeValue:string")), "FieldsOfStudy" -> Tuple2("advanced/FieldsOfStudy.txt", Seq("FieldOfStudyId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "MainType:string", "Level:int", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")), - "Journals" -> Tuple2("mag/Journals.txt", Seq("JournalId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "Issn:string", "Publisher:string", "Webpage:string", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "CreatedDate:DateTime")), + "Journals" -> Tuple2("mag/Journals.txt", Seq("JournalId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "Issn:string", "Publisher:string", "Webpage:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")), "PaperAbstractsInvertedIndex" -> Tuple2("nlp/PaperAbstractsInvertedIndex.txt.*", Seq("PaperId:long", "IndexedAbstract:string")), "PaperAuthorAffiliations" -> Tuple2("mag/PaperAuthorAffiliations.txt", Seq("PaperId:long", "AuthorId:long", "AffiliationId:long?", "AuthorSequenceNumber:uint", "OriginalAuthor:string", "OriginalAffiliation:string")), "PaperCitationContexts" -> Tuple2("nlp/PaperCitationContexts.txt", Seq("PaperId:long", "PaperReferenceId:long", "CitationContext:string")), @@ -75,7 +75,6 @@ object SparkImportMagIntoDataset { .master(parser.get("master")).getOrCreate() - stream.foreach { case (k, v) => val s: StructType = getSchema(k) val df = spark.read diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala similarity index 90% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala index 0ea4b66a4..41e95baa1 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala @@ -5,22 +5,19 @@ import eu.dnetlib.dhp.schema.oaf.Publication import eu.dnetlib.doiboost.DoiBoostMappingUtil import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.functions._ +import org.apache.spark.sql.functions.{col, collect_list, struct} import org.apache.spark.sql._ import org.slf4j.{Logger, LoggerFactory} - import scala.collection.JavaConverters._ - object SparkProcessMAG { - def getDistinctResults (d:Dataset[MagPapers]):Dataset[MagPapers]={ + def getDistinctResults(d: Dataset[MagPapers]): Dataset[MagPapers] = { d.where(col("Doi").isNotNull) .groupByKey(mp => DoiBoostMappingUtil.normalizeDoi(mp.Doi))(Encoders.STRING) - .reduceGroups((p1:MagPapers,p2:MagPapers) => ConversionUtil.choiceLatestMagArtitcle(p1,p2)) + .reduceGroups((p1: MagPapers, p2: MagPapers) => ConversionUtil.choiceLatestMagArtitcle(p1, p2)) .map(_._2)(Encoders.product[MagPapers]) .map(mp => { - new MagPapers(mp.PaperId, mp.Rank, DoiBoostMappingUtil.normalizeDoi(mp.Doi), + MagPapers(mp.PaperId, mp.Rank, DoiBoostMappingUtil.normalizeDoi(mp.Doi), mp.DocType, mp.PaperTitle, mp.OriginalTitle, mp.BookTitle, mp.Year, mp.Date, mp.Publisher: String, mp.JournalId, mp.ConferenceSeriesId, mp.ConferenceInstanceId, @@ -98,13 +95,13 @@ object SparkProcessMAG { var magPubs: Dataset[(String, Publication)] = spark.read.load(s"$workingPath/merge_step_2").as[Publication] - .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] + .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] val conference = spark.read.load(s"$sourcePath/ConferenceInstances") - .select($"ConferenceInstanceId".as("ci"), $"DisplayName", $"Location", $"StartDate",$"EndDate" ) + .select($"ConferenceInstanceId".as("ci"), $"DisplayName", $"Location", $"StartDate", $"EndDate") val conferenceInstance = conference.joinWith(papers, papers("ConferenceInstanceId").equalTo(conference("ci"))) - .select($"_1.ci", $"_1.DisplayName", $"_1.Location", $"_1.StartDate",$"_1.EndDate", $"_2.PaperId").as[MagConferenceInstance] + .select($"_1.ci", $"_1.DisplayName", $"_1.Location", $"_1.StartDate", $"_1.EndDate", $"_2.PaperId").as[MagConferenceInstance] magPubs.joinWith(conferenceInstance, col("_1").equalTo(conferenceInstance("PaperId")), "left") @@ -122,7 +119,7 @@ object SparkProcessMAG { magPubs.joinWith(paperAbstract, col("_1").equalTo(paperAbstract("PaperId")), "left") .map(item => ConversionUtil.updatePubsWithDescription(item) - ).write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_4") + ).write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_4") logger.info("Phase 7) Enrich Publication with FieldOfStudy") @@ -148,11 +145,10 @@ object SparkProcessMAG { spark.read.load(s"$workingPath/mag_publication").as[Publication] .filter(p => p.getId != null) .groupByKey(p => p.getId) - .reduceGroups((a:Publication, b:Publication) => ConversionUtil.mergePublication(a,b)) + .reduceGroups((a: Publication, b: Publication) => ConversionUtil.mergePublication(a, b)) .map(_._2) .write.mode(SaveMode.Overwrite).save(s"$targetPath/magPublication") - } } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala similarity index 98% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala index 1cd3f7028..11031f9ca 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala @@ -4,17 +4,16 @@ import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Publication} -import eu.dnetlib.dhp.schema.orcid.{AuthorData, OrcidDOI} import eu.dnetlib.doiboost.DoiBoostMappingUtil import eu.dnetlib.doiboost.DoiBoostMappingUtil.{createSP, generateDataInfo} import org.apache.commons.lang.StringUtils -import org.slf4j.{Logger, LoggerFactory} - -import scala.collection.JavaConverters._ import org.json4s import org.json4s.DefaultFormats import org.json4s.JsonAST._ import org.json4s.jackson.JsonMethods._ +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConverters._ case class ORCIDItem(doi:String, authors:List[OrcidAuthor]){} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala similarity index 84% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala index fa4a93e00..1b189e296 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala @@ -11,10 +11,10 @@ object SparkConvertORCIDToOAF { val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass) - def run(spark:SparkSession, workingPath:String, targetPath:String) :Unit = { + def run(spark: SparkSession, workingPath: String, targetPath: String): Unit = { implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] import spark.implicits._ - val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem] + val dataset: Dataset[ORCIDItem] = spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem] logger.info("Converting ORCID to OAF") dataset.map(o => ORCIDToOAF.convertTOOAF(o)).write.mode(SaveMode.Overwrite).save(targetPath) @@ -35,8 +35,8 @@ object SparkConvertORCIDToOAF { val workingPath = parser.get("workingPath") val targetPath = parser.get("targetPath") - run(spark,workingPath, targetPath) + run(spark, workingPath, targetPath) } -} \ No newline at end of file +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala similarity index 67% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala index 31f331912..153be5dd1 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/orcid/SparkPreprocessORCID.scala @@ -1,48 +1,45 @@ package eu.dnetlib.doiboost.orcid -import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.oa.merge.AuthorMerger import eu.dnetlib.dhp.schema.oaf.Publication -import eu.dnetlib.dhp.schema.orcid.OrcidDOI import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql.functions.{col, collect_list} +import org.apache.spark.sql._ import org.slf4j.{Logger, LoggerFactory} object SparkPreprocessORCID { val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass) - def fixORCIDItem(item :ORCIDItem):ORCIDItem = { - ORCIDItem(item.doi, item.authors.groupBy(_.oid).map(_._2.head).toList) + def fixORCIDItem(item: ORCIDItem): ORCIDItem = { + ORCIDItem(item.doi, item.authors.groupBy(_.oid).map(_._2.head).toList) } - def run(spark:SparkSession,sourcePath:String,workingPath:String):Unit = { + def run(spark: SparkSession, sourcePath: String, workingPath: String): Unit = { import spark.implicits._ implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] - val inputRDD:RDD[OrcidAuthor] = spark.sparkContext.textFile(s"$sourcePath/authors").map(s => ORCIDToOAF.convertORCIDAuthor(s)).filter(s => s!= null).filter(s => ORCIDToOAF.authorValid(s)) + val inputRDD: RDD[OrcidAuthor] = spark.sparkContext.textFile(s"$sourcePath/authors").map(s => ORCIDToOAF.convertORCIDAuthor(s)).filter(s => s != null).filter(s => ORCIDToOAF.authorValid(s)) spark.createDataset(inputRDD).as[OrcidAuthor].write.mode(SaveMode.Overwrite).save(s"$workingPath/author") - val res = spark.sparkContext.textFile(s"$sourcePath/works").flatMap(s => ORCIDToOAF.extractDOIWorks(s)).filter(s => s!= null) + val res = spark.sparkContext.textFile(s"$sourcePath/works").flatMap(s => ORCIDToOAF.extractDOIWorks(s)).filter(s => s != null) spark.createDataset(res).as[OrcidWork].write.mode(SaveMode.Overwrite).save(s"$workingPath/works") - val authors :Dataset[OrcidAuthor] = spark.read.load(s"$workingPath/author").as[OrcidAuthor] + val authors: Dataset[OrcidAuthor] = spark.read.load(s"$workingPath/author").as[OrcidAuthor] - val works :Dataset[OrcidWork] = spark.read.load(s"$workingPath/works").as[OrcidWork] + val works: Dataset[OrcidWork] = spark.read.load(s"$workingPath/works").as[OrcidWork] works.joinWith(authors, authors("oid").equalTo(works("oid"))) - .map(i =>{ + .map(i => { val doi = i._1.doi val author = i._2 - (doi, author) - }).groupBy(col("_1").alias("doi")) + (doi, author) + }).groupBy(col("_1").alias("doi")) .agg(collect_list(col("_2")).alias("authors")).as[ORCIDItem] .map(s => fixORCIDItem(s)) .write.mode(SaveMode.Overwrite).save(s"$workingPath/orcidworksWithAuthor") @@ -67,4 +64,4 @@ object SparkPreprocessORCID { } -} \ No newline at end of file +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala similarity index 80% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala index 4530926f1..70290018d 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala @@ -1,16 +1,14 @@ package eu.dnetlib.doiboost.uw import eu.dnetlib.dhp.application.ArgumentApplicationParser - import eu.dnetlib.dhp.schema.oaf.Publication import eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql._ import org.slf4j.{Logger, LoggerFactory} - object SparkMapUnpayWallToOAF { def main(args: Array[String]): Unit = { @@ -32,11 +30,11 @@ object SparkMapUnpayWallToOAF { val sourcePath = parser.get("sourcePath") val targetPath = parser.get("targetPath") - val inputRDD:RDD[String] = spark.sparkContext.textFile(s"$sourcePath") + val inputRDD: RDD[String] = spark.sparkContext.textFile(s"$sourcePath") logger.info("Converting UnpayWall to OAF") - val d:Dataset[Publication] = spark.createDataset(inputRDD.map(UnpayWallToOAF.convertToOAF).filter(p=>p!=null)).as[Publication] + val d: Dataset[Publication] = spark.createDataset(inputRDD.map(UnpayWallToOAF.convertToOAF).filter(p => p != null)).as[Publication] d.write.mode(SaveMode.Overwrite).save(targetPath) } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala similarity index 98% rename from dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala rename to dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala index c8324cde1..bf5694965 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala @@ -4,14 +4,13 @@ import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory import eu.dnetlib.dhp.schema.oaf.{AccessRight, Instance, OpenAccessRoute, Publication} import eu.dnetlib.doiboost.DoiBoostMappingUtil +import eu.dnetlib.doiboost.DoiBoostMappingUtil._ import org.json4s import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ -import eu.dnetlib.doiboost.DoiBoostMappingUtil._ -import eu.dnetlib.doiboost.uw.UnpayWallToOAF.get_unpaywall_color diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/DoiBoostHostedByMapTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/DoiBoostHostedByMapTest.scala deleted file mode 100644 index 4912648be..000000000 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/DoiBoostHostedByMapTest.scala +++ /dev/null @@ -1,70 +0,0 @@ -package eu.dnetlib.dhp.doiboost - -import eu.dnetlib.dhp.schema.oaf.{Publication, Dataset => OafDataset} -import eu.dnetlib.doiboost.{DoiBoostMappingUtil, HostedByItemType} -import eu.dnetlib.doiboost.SparkGenerateDoiBoost.getClass -import eu.dnetlib.doiboost.mag.ConversionUtil -import eu.dnetlib.doiboost.orcid.ORCIDElement -import org.apache.spark.SparkConf -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} -import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig} -import org.junit.jupiter.api.Test - -import scala.io.Source - -class DoiBoostHostedByMapTest { - - - -// @Test -// def testMerge():Unit = { -// val conf: SparkConf = new SparkConf() -// val spark: SparkSession = -// SparkSession -// .builder() -// .config(conf) -// .appName(getClass.getSimpleName) -// .master("local[*]").getOrCreate() -// -// -// -// implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] -// implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset] -// implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPub) -// -// -// import spark.implicits._ -// val dataset:RDD[String]= spark.sparkContext.textFile("/home/sandro/Downloads/hbMap.gz") -// -// -// val hbMap:Dataset[(String, HostedByItemType)] =spark.createDataset(dataset.map(DoiBoostMappingUtil.toHostedByItem)) -// -// -// hbMap.show() -// -// -// -// -// -// -// -// -// -// -// } - - - @Test - def idDSGeneration():Unit = { - val s ="doajarticles::0066-782X" - - - - println(DoiBoostMappingUtil.generateDSId(s)) - - - } - - -} diff --git a/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/DoiBoostHostedByMapTest.scala b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/DoiBoostHostedByMapTest.scala new file mode 100644 index 000000000..41730ade0 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/DoiBoostHostedByMapTest.scala @@ -0,0 +1,20 @@ +package eu.dnetlib.dhp.doiboost + +import eu.dnetlib.doiboost.DoiBoostMappingUtil +import org.junit.jupiter.api.Test + +class DoiBoostHostedByMapTest { + + @Test + def idDSGeneration():Unit = { + val s ="doajarticles::0066-782X" + + + + println(DoiBoostMappingUtil.generateDSId(s)) + + + } + + +} diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/NormalizeDoiTest.scala b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/NormalizeDoiTest.scala similarity index 100% rename from dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/NormalizeDoiTest.scala rename to dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/NormalizeDoiTest.scala diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/crossref/CrossrefMappingTest.scala similarity index 88% rename from dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala rename to dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/crossref/CrossrefMappingTest.scala index 5ef92cfa4..71dbf27be 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/crossref/CrossrefMappingTest.scala @@ -1,7 +1,8 @@ -package eu.dnetlib.doiboost.crossref +package eu.dnetlib.dhp.doiboost.crossref import eu.dnetlib.dhp.schema.oaf._ import eu.dnetlib.dhp.utils.DHPUtils +import eu.dnetlib.doiboost.crossref.Crossref2Oaf import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -21,9 +22,9 @@ class CrossrefMappingTest { @Test def testFunderRelationshipsMapping(): Unit = { - val template = Source.fromInputStream(getClass.getResourceAsStream("article_funder_template.json")).mkString - val funder_doi = Source.fromInputStream(getClass.getResourceAsStream("funder_doi")).mkString - val funder_name = Source.fromInputStream(getClass.getResourceAsStream("funder_doi")).mkString + val template = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/article_funder_template.json")).mkString + val funder_doi = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/funder_doi")).mkString + val funder_name = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/funder_doi")).mkString for (line <- funder_doi.lines) { @@ -72,7 +73,7 @@ class CrossrefMappingTest { @Test def testOrcidID() :Unit = { - val json = Source.fromInputStream(getClass.getResourceAsStream("orcid_data.json")).mkString + val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/orcid_data.json")).mkString assertNotNull(json) @@ -93,7 +94,7 @@ class CrossrefMappingTest { @Test def testEmptyTitle() :Unit = { - val json = Source.fromInputStream(getClass.getResourceAsStream("empty_title.json")).mkString + val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/empty_title.json")).mkString assertNotNull(json) @@ -115,7 +116,7 @@ class CrossrefMappingTest { @Test def testPeerReviewed(): Unit = { - val json = Source.fromInputStream(getClass.getResourceAsStream("prwTest.json")).mkString + val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/prwTest.json")).mkString mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT) assertNotNull(json) @@ -156,7 +157,7 @@ class CrossrefMappingTest { @Test def testJournalRelation(): Unit = { - val json = Source.fromInputStream(getClass.getResourceAsStream("awardTest.json")).mkString + val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/awardTest.json")).mkString assertNotNull(json) assertFalse(json.isEmpty) @@ -177,7 +178,7 @@ class CrossrefMappingTest { @Test def testConvertBookFromCrossRef2Oaf(): Unit = { - val json = Source.fromInputStream(getClass.getResourceAsStream("book.json")).mkString + val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/book.json")).mkString assertNotNull(json) assertFalse(json.isEmpty); @@ -233,7 +234,7 @@ class CrossrefMappingTest { @Test def testConvertPreprintFromCrossRef2Oaf(): Unit = { - val json = Source.fromInputStream(getClass.getResourceAsStream("preprint.json")).mkString + val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/preprint.json")).mkString assertNotNull(json) assertFalse(json.isEmpty); @@ -291,7 +292,7 @@ class CrossrefMappingTest { @Test def testConvertDatasetFromCrossRef2Oaf(): Unit = { - val json = Source.fromInputStream(getClass.getResourceAsStream("dataset.json")).mkString + val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/dataset.json")).mkString assertNotNull(json) assertFalse(json.isEmpty); @@ -332,7 +333,7 @@ class CrossrefMappingTest { @Test def testConvertArticleFromCrossRef2Oaf(): Unit = { - val json = Source.fromInputStream(getClass.getResourceAsStream("article.json")).mkString + val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/article.json")).mkString assertNotNull(json) assertFalse(json.isEmpty); @@ -400,7 +401,7 @@ class CrossrefMappingTest { @Test def testSetDateOfAcceptanceCrossRef2Oaf(): Unit = { - val json = Source.fromInputStream(getClass.getResourceAsStream("dump_file.json")).mkString + val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/dump_file.json")).mkString assertNotNull(json) assertFalse(json.isEmpty); @@ -415,55 +416,12 @@ class CrossrefMappingTest { assert(items.size == 1) val result: Result = items.head.asInstanceOf[Publication] assertNotNull(result) - logger.info(mapper.writeValueAsString(result)); - -// assertNotNull(result.getDataInfo, "Datainfo test not null Failed"); -// assertNotNull( -// result.getDataInfo.getProvenanceaction, -// "DataInfo/Provenance test not null Failed"); -// assertFalse( -// result.getDataInfo.getProvenanceaction.getClassid.isEmpty, -// "DataInfo/Provenance/classId test not null Failed"); -// assertFalse( -// result.getDataInfo.getProvenanceaction.getClassname.isEmpty, -// "DataInfo/Provenance/className test not null Failed"); -// assertFalse( -// result.getDataInfo.getProvenanceaction.getSchemeid.isEmpty, -// "DataInfo/Provenance/SchemeId test not null Failed"); -// assertFalse( -// result.getDataInfo.getProvenanceaction.getSchemename.isEmpty, -// "DataInfo/Provenance/SchemeName test not null Failed"); -// -// assertNotNull(result.getCollectedfrom, "CollectedFrom test not null Failed"); -// assertFalse(result.getCollectedfrom.isEmpty); -// -// val collectedFromList = result.getCollectedfrom.asScala -// assert(collectedFromList.exists(c => c.getKey.equalsIgnoreCase("10|openaire____::081b82f96300b6a6e3d282bad31cb6e2")), "Wrong collected from assertion") -// -// assert(collectedFromList.exists(c => c.getValue.equalsIgnoreCase("crossref")), "Wrong collected from assertion") -// -// -// val relevantDates = result.getRelevantdate.asScala -// -// assert(relevantDates.exists(d => d.getQualifier.getClassid.equalsIgnoreCase("created")), "Missing relevant date of type created") -// -// val rels = resultList.filter(p => p.isInstanceOf[Relation]).asInstanceOf[List[Relation]] -// assertFalse(rels.isEmpty) -// rels.foreach(relation => { -// assertNotNull(relation) -// assertFalse(relation.getSource.isEmpty) -// assertFalse(relation.getTarget.isEmpty) -// assertFalse(relation.getRelClass.isEmpty) -// assertFalse(relation.getRelType.isEmpty) -// assertFalse(relation.getSubRelType.isEmpty) -// -// }) } @Test def testNormalizeDOI(): Unit = { - val template = Source.fromInputStream(getClass.getResourceAsStream("article_funder_template.json")).mkString + val template = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/article_funder_template.json")).mkString val line :String = "\"funder\": [{\"name\": \"Wellcome Trust Masters Fellowship\",\"award\": [\"090633\"]}]," val json = template.replace("%s", line) val resultList: List[Oaf] = Crossref2Oaf.convert(json) @@ -479,7 +437,7 @@ class CrossrefMappingTest { @Test def testNormalizeDOI2(): Unit = { - val template = Source.fromInputStream(getClass.getResourceAsStream("article.json")).mkString + val template = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/article.json")).mkString val resultList: List[Oaf] = Crossref2Oaf.convert(template) assertTrue(resultList.nonEmpty) @@ -494,7 +452,7 @@ class CrossrefMappingTest { @Test def testLicenseVorClosed() :Unit = { - val json = Source.fromInputStream(getClass.getResourceAsStream("publication_license_vor.json")).mkString + val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/publication_license_vor.json")).mkString assertNotNull(json) @@ -521,7 +479,7 @@ class CrossrefMappingTest { @Test def testLicenseOpen() :Unit = { - val json = Source.fromInputStream(getClass.getResourceAsStream("publication_license_open.json")).mkString + val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/publication_license_open.json")).mkString assertNotNull(json) @@ -544,7 +502,7 @@ class CrossrefMappingTest { @Test def testLicenseEmbargoOpen() :Unit = { - val json = Source.fromInputStream(getClass.getResourceAsStream("publication_license_embargo_open.json")).mkString + val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/publication_license_embargo_open.json")).mkString assertNotNull(json) @@ -567,7 +525,7 @@ class CrossrefMappingTest { @Test def testLicenseEmbargo() :Unit = { - val json = Source.fromInputStream(getClass.getResourceAsStream("publication_license_embargo.json")).mkString + val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/publication_license_embargo.json")).mkString assertNotNull(json) @@ -591,7 +549,7 @@ class CrossrefMappingTest { @Test def testLicenseEmbargoDateTime() :Unit = { - val json = Source.fromInputStream(getClass.getResourceAsStream("publication_license_embargo_datetime.json")).mkString + val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/publication_license_embargo_datetime.json")).mkString assertNotNull(json) @@ -614,7 +572,7 @@ class CrossrefMappingTest { @Test def testMultipleURLs() :Unit = { - val json = Source.fromInputStream(getClass.getResourceAsStream("multiple_urls.json")).mkString + val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/multiple_urls.json")).mkString assertNotNull(json) diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/mag/MAGMappingTest.scala similarity index 88% rename from dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala rename to dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/mag/MAGMappingTest.scala index 46d4ec08d..611f3b323 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/mag/MAGMappingTest.scala @@ -1,11 +1,12 @@ -package eu.dnetlib.doiboost.mag +package eu.dnetlib.dhp.doiboost.mag +import eu.dnetlib.doiboost.mag.{ConversionUtil, MagPapers, SparkProcessMAG} import org.apache.spark.SparkConf import org.apache.spark.sql.{Dataset, SparkSession} import org.codehaus.jackson.map.ObjectMapper +import org.json4s.DefaultFormats import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test -import org.json4s.DefaultFormats import org.slf4j.{Logger, LoggerFactory} import java.sql.Timestamp @@ -47,7 +48,7 @@ class MAGMappingTest { @Test def buildInvertedIndexTest(): Unit = { - val json_input = Source.fromInputStream(getClass.getResourceAsStream("invertedIndex.json")).mkString + val json_input = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/mag/invertedIndex.json")).mkString val description = ConversionUtil.convertInvertedIndexString(json_input) assertNotNull(description) assertTrue(description.nonEmpty) @@ -71,7 +72,7 @@ class MAGMappingTest { .appName(getClass.getSimpleName) .config(conf) .getOrCreate() - val path = getClass.getResource("magPapers.json").getPath + val path = getClass.getResource("/eu/dnetlib/doiboost/mag/magPapers.json").getPath import org.apache.spark.sql.Encoders val schema = Encoders.product[MagPapers].schema @@ -101,7 +102,7 @@ class MAGMappingTest { .appName(getClass.getSimpleName) .config(conf) .getOrCreate() - val path = getClass.getResource("duplicatedMagPapers.json").getPath + val path = getClass.getResource("/eu/dnetlib/doiboost/mag/duplicatedMagPapers.json").getPath import org.apache.spark.sql.Encoders val schema = Encoders.product[MagPapers].schema diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/orcid/MappingORCIDToOAFTest.scala similarity index 95% rename from dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala rename to dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/orcid/MappingORCIDToOAFTest.scala index b484dc087..7c8f01f81 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/MappingORCIDToOAFTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/orcid/MappingORCIDToOAFTest.scala @@ -1,7 +1,8 @@ -package eu.dnetlib.doiboost.orcid +package eu.dnetlib.dhp.doiboost.orcid import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.schema.oaf.Publication +import eu.dnetlib.doiboost.orcid._ import org.apache.spark.SparkConf import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} import org.junit.jupiter.api.Assertions._ @@ -10,9 +11,8 @@ import org.junit.jupiter.api.io.TempDir import org.slf4j.{Logger, LoggerFactory} import java.nio.file.Path -import scala.io.Source - import scala.collection.JavaConversions._ +import scala.io.Source class MappingORCIDToOAFTest { val logger: Logger = LoggerFactory.getLogger(ORCIDToOAF.getClass) @@ -20,7 +20,7 @@ class MappingORCIDToOAFTest { @Test def testExtractData():Unit ={ - val json = Source.fromInputStream(getClass.getResourceAsStream("dataOutput")).mkString + val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/orcid/dataOutput")).mkString assertNotNull(json) assertFalse(json.isEmpty) json.lines.foreach(s => { diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/uw/UnpayWallMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/uw/UnpayWallMappingTest.scala similarity index 91% rename from dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/uw/UnpayWallMappingTest.scala rename to dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/uw/UnpayWallMappingTest.scala index fa696fffc..6671758b2 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/uw/UnpayWallMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/scala/eu/dnetlib/dhp/doiboost/uw/UnpayWallMappingTest.scala @@ -1,13 +1,13 @@ -package eu.dnetlib.doiboost.uw - +package eu.dnetlib.dhp.doiboost.uw import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.schema.oaf.OpenAccessRoute +import eu.dnetlib.doiboost.uw.UnpayWallToOAF +import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test +import org.slf4j.{Logger, LoggerFactory} import scala.io.Source -import org.junit.jupiter.api.Assertions._ -import org.slf4j.{Logger, LoggerFactory} class UnpayWallMappingTest { @@ -18,7 +18,7 @@ class UnpayWallMappingTest { @Test def testMappingToOAF():Unit ={ - val Ilist = Source.fromInputStream(getClass.getResourceAsStream("input.json")).mkString + val Ilist = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/uw/input.json")).mkString var i:Int = 0 for (line <-Ilist.lines) {