From 4e55ddc547451c18b9e14cce8244015e6825b3cf Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 19 Nov 2024 16:50:42 +0100 Subject: [PATCH] [PubMed aggregation] storing contents into mdStoreVersion/store --- .../dnetlib/dhp/sx/bio/ebi/SparkCreatePubmedDump.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreatePubmedDump.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreatePubmedDump.scala index 1bdd2a4bcc..adac9ffb6e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreatePubmedDump.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreatePubmedDump.scala @@ -2,9 +2,13 @@ package eu.dnetlib.dhp.sx.bio.ebi import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.AbstractScalaApplication +import eu.dnetlib.dhp.common.Constants +import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH} import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMParser2, PubMedToOaf} +import eu.dnetlib.dhp.transformation.TransformSparkJobNode +import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile import eu.dnetlib.dhp.utils.ISLookupClientFactory import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.slf4j.{Logger, LoggerFactory} @@ -82,7 +86,10 @@ class SparkCreatePubmedDump(propertyPath: String, args: Array[String], log: Logg .write .option("compression", "gzip") .mode("overwrite") - .text(targetPath) + .text(targetPath + MDSTORE_DATA_PATH) + + val mdStoreSize = spark.read.text(targetPath + MDSTORE_DATA_PATH).count + writeHdfsFile(spark.sparkContext.hadoopConfiguration, "" + mdStoreSize, targetPath + MDSTORE_SIZE_PATH) } }