package eu.dnetlib.dhp.sx.bio.ebi import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.AbstractScalaApplication import eu.dnetlib.dhp.collection.CollectionUtils import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion import eu.dnetlib.dhp.schema.oaf.Oaf import eu.dnetlib.dhp.sx.bio.BioDBToOAF import eu.dnetlib.dhp.sx.bio.BioDBToOAF.EBILinkItem import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} import org.slf4j.{Logger, LoggerFactory} class SparkTransformEBILinksToOaf(propertyPath: String, args: Array[String], log: Logger) extends AbstractScalaApplication(propertyPath, args, log: Logger) { def transformLinks(spark: SparkSession, sourcePath: String, outputBasePath: String, targetPath: String) = { implicit val PMEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) import spark.implicits._ val ebLinks: Dataset[EBILinkItem] = spark.read .load(sourcePath) .as[EBILinkItem] .filter(l => l.links != null && l.links.startsWith("{")) CollectionUtils.saveDataset( ebLinks .flatMap(j => BioDBToOAF.parse_ebi_links(j.links)) .filter(p => BioDBToOAF.EBITargetLinksFilter(p)) .flatMap(p => BioDBToOAF.convertEBILinksToOaf(p)), targetPath ) reportTotalSize(targetPath, outputBasePath) } /** Here all the spark applications runs this method * where the whole logic of the spark node is defined */ override def run(): Unit = { val sourcePath = parser.get("sourcePath") log.info(s"sourcePath is '$sourcePath'") val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'") val mapper = new ObjectMapper() val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) val outputBasePath = cleanedMdStoreVersion.getHdfsPath log.info(s"outputBasePath is '$outputBasePath'") val targetPath = s"$outputBasePath$MDSTORE_DATA_PATH" log.info(s"targetPath is '$targetPath'") transformLinks(spark, sourcePath, outputBasePath, targetPath) } } object SparkTransformEBILinksToOaf { val log: Logger = LoggerFactory.getLogger(SparkTransformEBILinksToOaf.getClass) def main(args: Array[String]): Unit = { new SparkTransformEBILinksToOaf("/eu/dnetlib/dhp/sx/bio/ebi/transform_ebi_to_df_params.json", args, log) .initialize() .run() } }