From 7af0bbd0b1da976b628f123d9134724f4fa3cea9 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 6 Dec 2021 11:26:36 +0100 Subject: [PATCH] [scala-refactor] Module dhp-aggregation: Moved all scala source into src/main/scala and src/test/scala --- .../scholix/SparkCreateActionset.scala | 69 --------------- .../scholix/SparkSaveActionSet.scala | 86 ------------------- .../dhp/collection/CollectionUtils.scala | 0 .../dhp/datacite/AbstractRestClient.scala | 0 .../dhp/datacite/DataciteAPIImporter.scala | 0 .../dhp/datacite/DataciteModelConstants.scala | 2 +- .../DataciteToOAFTransformation.scala | 35 ++++---- .../GenerateDataciteDatasetSpark.scala | 0 .../dnetlib/dhp/datacite/ImportDatacite.scala | 0 .../SparkDownloadUpdateDatacite.scala | 0 .../eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala | 1 + .../bio/SparkTransformBioDatabaseToOAF.scala | 12 +-- .../ebi/SparkCreateBaselineDataFrame.scala | 2 +- .../sx/bio/ebi/SparkDownloadEBILinks.scala | 3 +- .../dhp/sx/bio/ebi/SparkEBILinksToOaf.scala | 5 +- .../dnetlib/dhp/sx/bio/pubmed/PMParser.scala | 0 .../dhp/sx/bio/pubmed/PubMedToOaf.scala | 19 ++-- .../dhp/datacite/DataciteToOAFTest.scala | 2 +- .../dnetlib/dhp/sx/bio/BioScholixTest.scala | 0 19 files changed, 39 insertions(+), 197 deletions(-) delete mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkCreateActionset.scala delete mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkSaveActionSet.scala rename dhp-workflows/dhp-aggregation/src/main/{java => scala}/eu/dnetlib/dhp/collection/CollectionUtils.scala (100%) rename dhp-workflows/dhp-aggregation/src/main/{java => scala}/eu/dnetlib/dhp/datacite/AbstractRestClient.scala (100%) rename dhp-workflows/dhp-aggregation/src/main/{java => scala}/eu/dnetlib/dhp/datacite/DataciteAPIImporter.scala (100%) rename dhp-workflows/dhp-aggregation/src/main/{java => scala}/eu/dnetlib/dhp/datacite/DataciteModelConstants.scala (100%) rename dhp-workflows/dhp-aggregation/src/main/{java => scala}/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala (93%) rename dhp-workflows/dhp-aggregation/src/main/{java => scala}/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala (100%) rename dhp-workflows/dhp-aggregation/src/main/{java => scala}/eu/dnetlib/dhp/datacite/ImportDatacite.scala (100%) rename dhp-workflows/dhp-aggregation/src/main/{java => scala}/eu/dnetlib/dhp/datacite/SparkDownloadUpdateDatacite.scala (100%) rename dhp-workflows/dhp-aggregation/src/main/{java => scala}/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala (99%) rename dhp-workflows/dhp-aggregation/src/main/{java => scala}/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala (73%) rename dhp-workflows/dhp-aggregation/src/main/{java => scala}/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala (98%) rename dhp-workflows/dhp-aggregation/src/main/{java => scala}/eu/dnetlib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala (98%) rename dhp-workflows/dhp-aggregation/src/main/{java => scala}/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala (93%) rename dhp-workflows/dhp-aggregation/src/main/{java => scala}/eu/dnetlib/dhp/sx/bio/pubmed/PMParser.scala (100%) rename dhp-workflows/dhp-aggregation/src/main/{java => scala}/eu/dnetlib/dhp/sx/bio/pubmed/PubMedToOaf.scala (96%) rename dhp-workflows/dhp-aggregation/src/test/{java => scala}/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala (99%) rename dhp-workflows/dhp-aggregation/src/test/{java => scala}/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala (100%) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkCreateActionset.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkCreateActionset.scala deleted file mode 100644 index 7a87861db..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkCreateActionset.scala +++ /dev/null @@ -1,69 +0,0 @@ -package eu.dnetlib.dhp.actionmanager.scholix - -import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result} -import org.apache.spark.SparkConf -import org.apache.spark.sql._ -import org.slf4j.{Logger, LoggerFactory} - -import scala.io.Source - -object SparkCreateActionset { - - def main(args: Array[String]): Unit = { - val log: Logger = LoggerFactory.getLogger(getClass) - val conf: SparkConf = new SparkConf() - val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/actionset/generate_actionset.json")).mkString) - parser.parseArgument(args) - - - val spark: SparkSession = - SparkSession - .builder() - .config(conf) - .appName(getClass.getSimpleName) - .master(parser.get("master")).getOrCreate() - - - val sourcePath = parser.get("sourcePath") - log.info(s"sourcePath -> $sourcePath") - - val targetPath = parser.get("targetPath") - log.info(s"targetPath -> $targetPath") - - val workingDirFolder = parser.get("workingDirFolder") - log.info(s"workingDirFolder -> $workingDirFolder") - - implicit val oafEncoders: Encoder[Oaf] = Encoders.kryo[Oaf] - implicit val resultEncoders: Encoder[Result] = Encoders.kryo[Result] - implicit val relationEncoders: Encoder[Relation] = Encoders.kryo[Relation] - - import spark.implicits._ - - val relation = spark.read.load(s"$sourcePath/relation").as[Relation] - - relation.filter(r => (r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge")) - .flatMap(r => List(r.getSource, r.getTarget)).distinct().write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/id_relation") - - - val idRelation = spark.read.load(s"$workingDirFolder/id_relation").as[String] - - log.info("extract source and target Identifier involved in relations") - - - log.info("save relation filtered") - - relation.filter(r => (r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge")) - .write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/actionSetOaf") - - log.info("saving entities") - - val entities: Dataset[(String, Result)] = spark.read.load(s"$sourcePath/entities/*").as[Result].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING, resultEncoders)) - - entities - .joinWith(idRelation, entities("_1").equalTo(idRelation("value"))) - .map(p => p._1._2) - .write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf") - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkSaveActionSet.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkSaveActionSet.scala deleted file mode 100644 index 1df7ea3fb..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/scholix/SparkSaveActionSet.scala +++ /dev/null @@ -1,86 +0,0 @@ -package eu.dnetlib.dhp.actionmanager.scholix - -import com.fasterxml.jackson.databind.ObjectMapper -import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.action.AtomicAction -import eu.dnetlib.dhp.schema.oaf.{Oaf, Dataset => OafDataset,Publication, Software, OtherResearchProduct, Relation} -import org.apache.hadoop.io.Text -import org.apache.hadoop.io.compress.GzipCodec -import org.apache.hadoop.mapred.SequenceFileOutputFormat -import org.apache.spark.SparkConf -import org.apache.spark.sql.{Encoder, Encoders, SparkSession} -import org.slf4j.{Logger, LoggerFactory} - -import scala.io.Source - -object SparkSaveActionSet { - - - def toActionSet(item: Oaf): (String, String) = { - val mapper = new ObjectMapper() - - item match { - case dataset: OafDataset => - val a: AtomicAction[OafDataset] = new AtomicAction[OafDataset] - a.setClazz(classOf[OafDataset]) - a.setPayload(dataset) - (dataset.getClass.getCanonicalName, mapper.writeValueAsString(a)) - case publication: Publication => - val a: AtomicAction[Publication] = new AtomicAction[Publication] - a.setClazz(classOf[Publication]) - a.setPayload(publication) - (publication.getClass.getCanonicalName, mapper.writeValueAsString(a)) - case software: Software => - val a: AtomicAction[Software] = new AtomicAction[Software] - a.setClazz(classOf[Software]) - a.setPayload(software) - (software.getClass.getCanonicalName, mapper.writeValueAsString(a)) - case orp: OtherResearchProduct => - val a: AtomicAction[OtherResearchProduct] = new AtomicAction[OtherResearchProduct] - a.setClazz(classOf[OtherResearchProduct]) - a.setPayload(orp) - (orp.getClass.getCanonicalName, mapper.writeValueAsString(a)) - - case relation: Relation => - val a: AtomicAction[Relation] = new AtomicAction[Relation] - a.setClazz(classOf[Relation]) - a.setPayload(relation) - (relation.getClass.getCanonicalName, mapper.writeValueAsString(a)) - case _ => - null - } - - } - - def main(args: Array[String]): Unit = { - val log: Logger = LoggerFactory.getLogger(getClass) - val conf: SparkConf = new SparkConf() - val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/actionset/save_actionset.json")).mkString) - parser.parseArgument(args) - - - val spark: SparkSession = - SparkSession - .builder() - .config(conf) - .appName(getClass.getSimpleName) - .master(parser.get("master")).getOrCreate() - - - val sourcePath = parser.get("sourcePath") - log.info(s"sourcePath -> $sourcePath") - - val targetPath = parser.get("targetPath") - log.info(s"targetPath -> $targetPath") - - implicit val oafEncoders: Encoder[Oaf] = Encoders.kryo[Oaf] - implicit val tEncoder: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING) - - spark.read.load(sourcePath).as[Oaf] - .map(o => toActionSet(o)) - .filter(o => o != null) - .rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text, Text]], classOf[GzipCodec]) - - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/CollectionUtils.scala similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectionUtils.scala rename to dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/CollectionUtils.scala diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/AbstractRestClient.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/AbstractRestClient.scala similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/AbstractRestClient.scala rename to dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/AbstractRestClient.scala diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteAPIImporter.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteAPIImporter.scala similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteAPIImporter.scala rename to dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteAPIImporter.scala diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteModelConstants.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteModelConstants.scala similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteModelConstants.scala rename to dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteModelConstants.scala index 0685a04a3..6c5dc8cce 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteModelConstants.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteModelConstants.scala @@ -1,8 +1,8 @@ package eu.dnetlib.dhp.datacite import eu.dnetlib.dhp.schema.common.ModelConstants -import eu.dnetlib.dhp.schema.oaf.{DataInfo, KeyValue} import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils +import eu.dnetlib.dhp.schema.oaf.{DataInfo, KeyValue} import java.io.InputStream import java.time.format.DateTimeFormatter diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala similarity index 93% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala rename to dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala index 6b4deef0a..a662cf99d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala @@ -6,7 +6,7 @@ import eu.dnetlib.dhp.datacite.DataciteModelConstants._ import eu.dnetlib.dhp.schema.action.AtomicAction import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, OafMapperUtils} -import eu.dnetlib.dhp.schema.oaf.{AccessRight, Author, DataInfo, Instance, KeyValue, Oaf, OtherResearchProduct, Publication, Qualifier, Relation, Result, Software, StructuredProperty, Dataset => OafDataset} +import eu.dnetlib.dhp.schema.oaf.{Dataset => OafDataset, _} import eu.dnetlib.dhp.utils.DHPUtils import org.apache.commons.lang3.StringUtils import org.json4s.DefaultFormats @@ -29,6 +29,7 @@ object DataciteToOAFTransformation { /** * This method should skip record if json contains invalid text * defined in gile datacite_filter + * * @param json * @return True if the record should be skipped */ @@ -107,9 +108,9 @@ object DataciteToOAFTransformation { d } - def fix_thai_date(input:String, format:String) :String = { + def fix_thai_date(input: String, format: String): String = { try { - val a_date = LocalDate.parse(input,DateTimeFormatter.ofPattern(format)) + val a_date = LocalDate.parse(input, DateTimeFormatter.ofPattern(format)) val d = ThaiBuddhistDate.of(a_date.getYear, a_date.getMonth.getValue, a_date.getDayOfMonth) LocalDate.from(d).toString } catch { @@ -236,7 +237,7 @@ object DataciteToOAFTransformation { val p = match_pattern.get._2 val grantId = m.matcher(awardUri).replaceAll("$2") val targetId = s"$p${DHPUtils.md5(grantId)}" - List( generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo) ) + List(generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo)) } else List() @@ -335,15 +336,15 @@ object DataciteToOAFTransformation { .map(d => d.get) if (a_date.isDefined) { - if(doi.startsWith("10.14457")) - result.setEmbargoenddate(OafMapperUtils.field(fix_thai_date(a_date.get,"[yyyy-MM-dd]"), null)) + if (doi.startsWith("10.14457")) + result.setEmbargoenddate(OafMapperUtils.field(fix_thai_date(a_date.get, "[yyyy-MM-dd]"), null)) else result.setEmbargoenddate(OafMapperUtils.field(a_date.get, null)) } if (i_date.isDefined && i_date.get.isDefined) { - if(doi.startsWith("10.14457")) { - result.setDateofacceptance(OafMapperUtils.field(fix_thai_date(i_date.get.get,"[yyyy-MM-dd]"), null)) - result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(fix_thai_date(i_date.get.get,"[yyyy-MM-dd]"), null)) + if (doi.startsWith("10.14457")) { + result.setDateofacceptance(OafMapperUtils.field(fix_thai_date(i_date.get.get, "[yyyy-MM-dd]"), null)) + result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(fix_thai_date(i_date.get.get, "[yyyy-MM-dd]"), null)) } else { result.setDateofacceptance(OafMapperUtils.field(i_date.get.get, null)) @@ -351,9 +352,9 @@ object DataciteToOAFTransformation { } } else if (publication_year != null) { - if(doi.startsWith("10.14457")) { - result.setDateofacceptance(OafMapperUtils.field(fix_thai_date(s"01-01-$publication_year","[dd-MM-yyyy]"), null)) - result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(fix_thai_date(s"01-01-$publication_year","[dd-MM-yyyy]"), null)) + if (doi.startsWith("10.14457")) { + result.setDateofacceptance(OafMapperUtils.field(fix_thai_date(s"01-01-$publication_year", "[dd-MM-yyyy]"), null)) + result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(fix_thai_date(s"01-01-$publication_year", "[dd-MM-yyyy]"), null)) } else { result.setDateofacceptance(OafMapperUtils.field(s"01-01-$publication_year", null)) @@ -457,7 +458,7 @@ object DataciteToOAFTransformation { JField("relatedIdentifier", JString(relatedIdentifier)) <- relIdentifier } yield RelatedIdentifierType(relationType, relatedIdentifier, relatedIdentifierType) - relations = relations ::: generateRelations(rels,result.getId, if (i_date.isDefined && i_date.get.isDefined) i_date.get.get else null) + relations = relations ::: generateRelations(rels, result.getId, if (i_date.isDefined && i_date.get.isDefined) i_date.get.get else null) } if (relations != null && relations.nonEmpty) { List(result) ::: relations @@ -466,7 +467,7 @@ object DataciteToOAFTransformation { List(result) } - private def generateRelations(rels: List[RelatedIdentifierType], id:String, date:String):List[Relation] = { + private def generateRelations(rels: List[RelatedIdentifierType], id: String, date: String): List[Relation] = { rels .filter(r => subRelTypeMapping.contains(r.relationType) && ( @@ -484,12 +485,12 @@ object DataciteToOAFTransformation { rel.setSubRelType(subRelType) rel.setRelClass(r.relationType) - val dateProps:KeyValue = OafMapperUtils.keyValue(DATE_RELATION_KEY, date) + val dateProps: KeyValue = OafMapperUtils.keyValue(DATE_RELATION_KEY, date) rel.setProperties(List(dateProps).asJava) rel.setSource(id) - rel.setTarget(DHPUtils.generateUnresolvedIdentifier(r.relatedIdentifier,r.relatedIdentifierType)) + rel.setTarget(DHPUtils.generateUnresolvedIdentifier(r.relatedIdentifier, r.relatedIdentifierType)) rel.setCollectedfrom(List(DATACITE_COLLECTED_FROM).asJava) rel.getCollectedfrom.asScala.map(c => c.getValue).toList rel @@ -504,4 +505,4 @@ object DataciteToOAFTransformation { } -} \ No newline at end of file +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala rename to dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/ImportDatacite.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/ImportDatacite.scala similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/ImportDatacite.scala rename to dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/ImportDatacite.scala diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/SparkDownloadUpdateDatacite.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/SparkDownloadUpdateDatacite.scala similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/SparkDownloadUpdateDatacite.scala rename to dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/SparkDownloadUpdateDatacite.scala diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala similarity index 99% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala rename to dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala index 70dcc0184..853b24862 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala @@ -7,6 +7,7 @@ import org.json4s.DefaultFormats import org.json4s.JsonAST.{JField, JObject, JString} import org.json4s.jackson.JsonMethods.{compact, parse, render} import collection.JavaConverters._ + object BioDBToOAF { case class EBILinkItem(id: Long, links: String) {} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala similarity index 73% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala rename to dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala index 8ae8285e3..fcceacd44 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala @@ -1,9 +1,9 @@ package eu.dnetlib.dhp.sx.bio import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.Oaf -import BioDBToOAF.ScholixResolved import eu.dnetlib.dhp.collection.CollectionUtils +import eu.dnetlib.dhp.schema.oaf.Oaf +import eu.dnetlib.dhp.sx.bio.BioDBToOAF.ScholixResolved import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} @@ -36,13 +36,13 @@ object SparkTransformBioDatabaseToOAF { import spark.implicits._ database.toUpperCase() match { case "UNIPROT" => - spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) + spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))).flatMap(i => CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) case "PDB" => - spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) + spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))).flatMap(i => CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) case "SCHOLIX" => - spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) + spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)).flatMap(i => CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) case "CROSSREF_LINKS" => - spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) + spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))).flatMap(i => CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala similarity index 98% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala rename to dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala index 17d21f19c..660a26a6c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala @@ -3,7 +3,7 @@ package eu.dnetlib.dhp.sx.bio.ebi import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.schema.oaf.Result -import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal, PMParser, PubMedToOaf} +import eu.dnetlib.dhp.sx.bio.pubmed._ import eu.dnetlib.dhp.utils.ISLookupClientFactory import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala similarity index 98% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala rename to dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala index eab6b1dc6..18e39387f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkDownloadEBILinks.scala @@ -1,9 +1,8 @@ package eu.dnetlib.dhp.sx.bio.ebi import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal} import eu.dnetlib.dhp.sx.bio.BioDBToOAF.EBILinkItem -import eu.dnetlib.dhp.sx.bio.pubmed.PMJournal +import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal} import org.apache.commons.io.IOUtils import org.apache.http.client.config.RequestConfig import org.apache.http.client.methods.HttpGet diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala similarity index 93% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala rename to dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala index 8da617ca0..12af4824b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala @@ -1,11 +1,10 @@ package eu.dnetlib.dhp.sx.bio.ebi import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.collection.CollectionUtils import eu.dnetlib.dhp.schema.oaf.Oaf import eu.dnetlib.dhp.sx.bio.BioDBToOAF import eu.dnetlib.dhp.sx.bio.BioDBToOAF.EBILinkItem -import BioDBToOAF.EBILinkItem -import eu.dnetlib.dhp.collection.CollectionUtils import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql._ @@ -38,7 +37,7 @@ object SparkEBILinksToOaf { ebLinks.flatMap(j => BioDBToOAF.parse_ebi_links(j.links)) .filter(p => BioDBToOAF.EBITargetLinksFilter(p)) .flatMap(p => BioDBToOAF.convertEBILinksToOaf(p)) - .flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null) + .flatMap(i => CollectionUtils.fixRelations(i)).filter(i => i != null) .write.mode(SaveMode.Overwrite).save(targetPath) } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMParser.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/pubmed/PMParser.scala similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PMParser.scala rename to dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/pubmed/PMParser.scala diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PubMedToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/pubmed/PubMedToOaf.scala similarity index 96% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PubMedToOaf.scala rename to dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/pubmed/PubMedToOaf.scala index ecef32202..4a93a7cb4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/sx/bio/pubmed/PubMedToOaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/pubmed/PubMedToOaf.scala @@ -4,7 +4,7 @@ import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.utils.{GraphCleaningFunctions, IdentifierFactory, OafMapperUtils, PidType} import eu.dnetlib.dhp.schema.oaf._ -import scala.collection.JavaConverters._ +import collection.JavaConverters._ import java.util.regex.Pattern @@ -22,10 +22,10 @@ object PubMedToOaf { val collectedFrom: KeyValue = OafMapperUtils.keyValue(ModelConstants.EUROPE_PUBMED_CENTRAL_ID, "Europe PubMed Central") - /** * Cleaning the DOI Applying regex in order to * remove doi starting with URL + * * @param doi input DOI * @return cleaned DOI */ @@ -49,7 +49,7 @@ object PubMedToOaf { * starting from OAF instanceType value * * @param cobjQualifier OAF instance type - * @param vocabularies All dnet vocabularies + * @param vocabularies All dnet vocabularies * @return the correct instance */ def createResult(cobjQualifier: Qualifier, vocabularies: VocabularyGroup): Result = { @@ -65,7 +65,7 @@ object PubMedToOaf { } /** - * Mapping the Pubmedjournal info into the OAF Journale + * Mapping the Pubmedjournal info into the OAF Journale * * @param j the pubmedJournal * @return the OAF Journal @@ -91,9 +91,8 @@ object PubMedToOaf { * Find vocabulary term into synonyms and term in the vocabulary * * @param vocabularyName the input vocabulary name - * @param vocabularies all the vocabularies - * @param term the term to find - * + * @param vocabularies all the vocabularies + * @param term the term to find * @return the cleaned term value */ def getVocabularyTerm(vocabularyName: String, vocabularies: VocabularyGroup, term: String): Qualifier = { @@ -104,10 +103,9 @@ object PubMedToOaf { /** - * Map the Pubmed Article into the OAF instance + * Map the Pubmed Article into the OAF instance * - * - * @param article the pubmed articles + * @param article the pubmed articles * @param vocabularies the vocabularies * @return The OAF instance if the mapping did not fail */ @@ -185,7 +183,6 @@ object PubMedToOaf { //-------------------------------------------------------------------------------------- - // RESULT MAPPING //-------------------------------------------------------------------------------------- result.setDateofacceptance(OafMapperUtils.field(GraphCleaningFunctions.cleanDate(article.getDate), dataInfo)) diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala similarity index 99% rename from dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala rename to dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala index 8c8b4b5bf..5bb6ba67d 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala +++ b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala @@ -8,6 +8,7 @@ import org.apache.commons.io.FileUtils import org.apache.spark.SparkConf import org.apache.spark.sql.functions.{col, count} import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} +import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.mockito.junit.jupiter.MockitoExtension @@ -17,7 +18,6 @@ import java.nio.file.{Files, Path} import java.text.SimpleDateFormat import java.util.Locale import scala.io.Source -import org.junit.jupiter.api.Assertions._ @ExtendWith(Array(classOf[MockitoExtension])) class DataciteToOAFTest extends AbstractVocabularyTest{ diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala similarity index 100% rename from dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala rename to dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/sx/bio/BioScholixTest.scala