From a6977197b3925c520f76961cfff7b05c37a51d6f Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 3 Jan 2022 17:25:26 +0100 Subject: [PATCH] serialise records in the OAF-store-graph mdstores in json format. Read them again in the graph construction phase using a tolerant parser to support backward compatible changes in the evolution of the schema --- .../dhp/collection/CollectionUtils.scala | 13 ++++++++ .../GenerateDataciteDatasetSpark.scala | 16 +++++----- .../bio/SparkTransformBioDatabaseToOAF.scala | 10 +++--- .../ebi/SparkCreateBaselineDataFrame.scala | 14 ++++---- .../dhp/sx/bio/ebi/SparkEBILinksToOaf.scala | 7 ++-- .../dhp/sx/bio/pubmed/PubMedToOaf.scala | 2 +- .../raw/CopyHdfsOafSparkApplication.scala | 32 ++++++++++++++++--- 7 files changed, 65 insertions(+), 29 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/CollectionUtils.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/CollectionUtils.scala index 11ecfd6cb..26fcecbbd 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/CollectionUtils.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/CollectionUtils.scala @@ -1,7 +1,9 @@ package eu.dnetlib.dhp.collection +import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.schema.common.ModelSupport import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity, Relation} +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode} object CollectionUtils { @@ -46,4 +48,15 @@ object CollectionUtils { List() } + def saveDataset(d: Dataset[Oaf], targetPath: String):Unit = { + implicit val resultEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) + val mapper = new ObjectMapper + + d + .flatMap(i => CollectionUtils.fixRelations(i)) + .filter(i => i != null) + .map(r => mapper.writeValueAsString(r))(Encoders.STRING) + .write.mode(SaveMode.Overwrite).save(targetPath) + } + } diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala index e1607ee9c..d11c33fb4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala @@ -2,14 +2,14 @@ package eu.dnetlib.dhp.datacite import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.AbstractScalaApplication -import eu.dnetlib.dhp.collection.CollectionUtils.fixRelations +import eu.dnetlib.dhp.collection.CollectionUtils import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH} import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.schema.mdstore.{MDStoreVersion, MetadataRecord} import eu.dnetlib.dhp.schema.oaf.Oaf import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile import eu.dnetlib.dhp.utils.ISLookupClientFactory -import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.slf4j.{Logger, LoggerFactory} @@ -73,12 +73,12 @@ class GenerateDataciteDatasetSpark (propertyPath:String, args:Array[String], log implicit val mrEncoder: Encoder[MetadataRecord] = Encoders.kryo[MetadataRecord] implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] - spark.read.load(sourcePath).as[DataciteType] - .filter(d => d.isActive) - .flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks)) - .filter(d => d != null) - .flatMap(i => fixRelations(i)).filter(i => i != null) - .write.mode(SaveMode.Overwrite).save(targetPath) + CollectionUtils.saveDataset( + spark.read.load(sourcePath).as[DataciteType] + .filter(d => d.isActive) + .flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks)) + .filter(d => d != null), + targetPath) } } diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala index fcceacd44..27caa8f36 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala @@ -6,7 +6,7 @@ import eu.dnetlib.dhp.schema.oaf.Oaf import eu.dnetlib.dhp.sx.bio.BioDBToOAF.ScholixResolved import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf -import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.slf4j.{Logger, LoggerFactory} object SparkTransformBioDatabaseToOAF { @@ -36,13 +36,13 @@ object SparkTransformBioDatabaseToOAF { import spark.implicits._ database.toUpperCase() match { case "UNIPROT" => - spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))).flatMap(i => CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) + CollectionUtils.saveDataset(spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))), targetPath) case "PDB" => - spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))).flatMap(i => CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) + CollectionUtils.saveDataset(spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))), targetPath) case "SCHOLIX" => - spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)).flatMap(i => CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) + CollectionUtils.saveDataset(spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)), targetPath) case "CROSSREF_LINKS" => - spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))).flatMap(i => CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath) + CollectionUtils.saveDataset(spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))), targetPath) } } diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala index 660a26a6c..0fea4ff7f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame.scala @@ -1,8 +1,9 @@ package eu.dnetlib.dhp.sx.bio.ebi import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.collection.CollectionUtils import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup -import eu.dnetlib.dhp.schema.oaf.Result +import eu.dnetlib.dhp.schema.oaf.{Oaf, Result} import eu.dnetlib.dhp.sx.bio.pubmed._ import eu.dnetlib.dhp.utils.ISLookupClientFactory import org.apache.commons.io.IOUtils @@ -177,7 +178,7 @@ object SparkCreateBaselineDataFrame { implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle]) implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal]) implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor]) - implicit val resultEncoder: Encoder[Result] = Encoders.kryo(classOf[Result]) + implicit val resultEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) if (!"true".equalsIgnoreCase(skipUpdate)) { downloadBaseLineUpdate(s"$workingPath/baseline", hdfsServerUri) @@ -192,9 +193,10 @@ object SparkCreateBaselineDataFrame { } val exported_dataset = spark.read.load(s"$workingPath/baseline_dataset").as[PMArticle] - exported_dataset - .map(a => PubMedToOaf.convert(a, vocabularies)).as[Result] - .filter(p => p != null) - .write.mode(SaveMode.Overwrite).save(targetPath) + CollectionUtils.saveDataset(exported_dataset + .map(a => PubMedToOaf.convert(a, vocabularies)).as[Oaf] + .filter(p => p != null), + targetPath) + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala index 12af4824b..cd03f004d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkEBILinksToOaf.scala @@ -34,10 +34,9 @@ object SparkEBILinksToOaf { val ebLinks: Dataset[EBILinkItem] = spark.read.load(sourcePath).as[EBILinkItem].filter(l => l.links != null && l.links.startsWith("{")) - ebLinks.flatMap(j => BioDBToOAF.parse_ebi_links(j.links)) + CollectionUtils.saveDataset(ebLinks.flatMap(j => BioDBToOAF.parse_ebi_links(j.links)) .filter(p => BioDBToOAF.EBITargetLinksFilter(p)) - .flatMap(p => BioDBToOAF.convertEBILinksToOaf(p)) - .flatMap(i => CollectionUtils.fixRelations(i)).filter(i => i != null) - .write.mode(SaveMode.Overwrite).save(targetPath) + .flatMap(p => BioDBToOAF.convertEBILinksToOaf(p)), + targetPath) } } diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/pubmed/PubMedToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/pubmed/PubMedToOaf.scala index 4a93a7cb4..65717adff 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/pubmed/PubMedToOaf.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/pubmed/PubMedToOaf.scala @@ -109,7 +109,7 @@ object PubMedToOaf { * @param vocabularies the vocabularies * @return The OAF instance if the mapping did not fail */ - def convert(article: PMArticle, vocabularies: VocabularyGroup): Result = { + def convert(article: PMArticle, vocabularies: VocabularyGroup): Oaf = { if (article.getPublicationTypes == null) return null diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala index 91d2bafe3..1376c6b35 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala @@ -1,6 +1,6 @@ package eu.dnetlib.dhp.oa.graph.raw -import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.common.HdfsSupport import eu.dnetlib.dhp.schema.common.ModelSupport @@ -8,7 +8,10 @@ import eu.dnetlib.dhp.schema.oaf.Oaf import eu.dnetlib.dhp.utils.DHPUtils import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.{SparkConf, SparkContext} +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods.parse import org.slf4j.LoggerFactory + import scala.collection.JavaConverters._ import scala.io.Source @@ -45,18 +48,21 @@ object CopyHdfsOafSparkApplication { log.info("hdfsPath: {}", hdfsPath) implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] + import spark.implicits._ val paths = DHPUtils.mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation, true).asScala val validPaths: List[String] = paths.filter(p => HdfsSupport.exists(p, sc.hadoopConfiguration)).toList if (validPaths.nonEmpty) { - val oaf = spark.read.load(validPaths: _*).as[Oaf] - val mapper = new ObjectMapper() - val l = ModelSupport.oafTypes.entrySet.asScala.map(e => e.getKey).toList + val oaf = spark.read.load(validPaths: _*).as[String] + val mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + val l = ModelSupport.oafTypes.entrySet.asScala.toList l.foreach( e => - oaf.filter(o => o.getClass.getSimpleName.equalsIgnoreCase(e)) + oaf + .filter(o => isOafType(o, e.getKey)) + .map(j => mapper.readValue(j, e.getValue).asInstanceOf[Oaf]) .map(s => mapper.writeValueAsString(s))(Encoders.STRING) .write .option("compression", "gzip") @@ -65,4 +71,20 @@ object CopyHdfsOafSparkApplication { ) } } + + def isOafType(input: String, oafType: String): Boolean = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: org.json4s.JValue = parse(input) + if (oafType == "relation") { + val hasSource = (json \ "source").extractOrElse[String](null) + val hasTarget = (json \ "target").extractOrElse[String](null) + + hasSource != null && hasTarget != null + } else { + val hasId = (json \ "id").extractOrElse[String](null) + val resultType = (json \ "resulttype" \ "classid").extractOrElse[String](null) + hasId != null && oafType.equalsIgnoreCase(resultType) + } + + } }