package eu.dnetlib.dhp.sx.graph.pangaea import eu.dnetlib.dhp.application.ArgumentApplicationParser import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.{SparkConf, SparkContext} import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ import scala.io.Source object SparkGeneratePanagaeaDataset { def main(args: Array[String]): Unit = { val logger: Logger = LoggerFactory.getLogger(getClass) val conf: SparkConf = new SparkConf() val parser = new ArgumentApplicationParser( Source .fromInputStream( getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/pangaea/pangaea_to_dataset.json") ) .mkString ) parser.parseArgument(args) val spark: SparkSession = SparkSession .builder() .config(conf) .appName(SparkGeneratePanagaeaDataset.getClass.getSimpleName) .master(parser.get("master")) .getOrCreate() parser.getObjectMap.asScala.foreach(s => logger.info(s"${s._1} -> ${s._2}")) logger.info("Converting sequential file into Dataset") val sc: SparkContext = spark.sparkContext val workingPath: String = parser.get("workingPath") implicit val pangaeaEncoders: Encoder[PangaeaDataModel] = Encoders.kryo[PangaeaDataModel] val inputRDD: RDD[PangaeaDataModel] = sc.textFile(s"$workingPath/update").map(s => PangaeaUtils.toDataset(s)) spark .createDataset(inputRDD) .as[PangaeaDataModel] .map(s => (s.identifier, s))(Encoders.tuple(Encoders.STRING, pangaeaEncoders)) .groupByKey(_._1)(Encoders.STRING) .agg(PangaeaUtils.getDatasetAggregator().toColumn) .map(s => s._2) .write .mode(SaveMode.Overwrite) .save(s"$workingPath/dataset") } }