1
0
Fork 0

[PubMed aggregation] storing contents into mdStoreVersion/store

This commit is contained in:
Claudio Atzori 2024-11-19 16:50:42 +01:00
parent ef51a60f19
commit 4e55ddc547
1 changed files with 8 additions and 1 deletions

View File

@ -2,9 +2,13 @@ package eu.dnetlib.dhp.sx.bio.ebi
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.AbstractScalaApplication import eu.dnetlib.dhp.application.AbstractScalaApplication
import eu.dnetlib.dhp.common.Constants
import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH}
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion
import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMParser2, PubMedToOaf} import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMParser2, PubMedToOaf}
import eu.dnetlib.dhp.transformation.TransformSparkJobNode
import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile
import eu.dnetlib.dhp.utils.ISLookupClientFactory import eu.dnetlib.dhp.utils.ISLookupClientFactory
import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
@ -82,7 +86,10 @@ class SparkCreatePubmedDump(propertyPath: String, args: Array[String], log: Logg
.write .write
.option("compression", "gzip") .option("compression", "gzip")
.mode("overwrite") .mode("overwrite")
.text(targetPath) .text(targetPath + MDSTORE_DATA_PATH)
val mdStoreSize = spark.read.text(targetPath + MDSTORE_DATA_PATH).count
writeHdfsFile(spark.sparkContext.hadoopConfiguration, "" + mdStoreSize, targetPath + MDSTORE_SIZE_PATH)
} }
} }