forked from D-Net/dnet-hadoop
49 lines
1.6 KiB
Scala
49 lines
1.6 KiB
Scala
|
package eu.dnetlib.dhp.sx.ebi
|
||
|
|
||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||
|
import org.apache.commons.io.IOUtils
|
||
|
import org.apache.spark.SparkConf
|
||
|
import org.apache.spark.rdd.RDD
|
||
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
||
|
import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal, PMParser}
|
||
|
|
||
|
|
||
|
import scala.io.Source
|
||
|
import scala.xml.pull.XMLEventReader
|
||
|
|
||
|
object SparkCreateBaselineDataFrame {
|
||
|
|
||
|
|
||
|
def main(args: Array[String]): Unit = {
|
||
|
val conf: SparkConf = new SparkConf()
|
||
|
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateEBIDataFrame.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json")))
|
||
|
parser.parseArgument(args)
|
||
|
val spark: SparkSession =
|
||
|
SparkSession
|
||
|
.builder()
|
||
|
.config(conf)
|
||
|
.appName(SparkCreateEBIDataFrame.getClass.getSimpleName)
|
||
|
.master(parser.get("master")).getOrCreate()
|
||
|
|
||
|
val sc = spark.sparkContext
|
||
|
|
||
|
val workingPath = parser.get("workingPath")
|
||
|
|
||
|
implicit val PMEncoder: Encoder[PMArticle] = Encoders.kryo(classOf[PMArticle])
|
||
|
implicit val PMJEncoder: Encoder[PMJournal] = Encoders.kryo(classOf[PMJournal])
|
||
|
implicit val PMAEncoder: Encoder[PMAuthor] = Encoders.kryo(classOf[PMAuthor])
|
||
|
val k: RDD[(String, String)] = sc.wholeTextFiles(s"$workingPath/baseline",2000)
|
||
|
val ds:Dataset[PMArticle] = spark.createDataset(k.filter(i => i._1.endsWith(".gz")).flatMap(i =>{
|
||
|
val xml = new XMLEventReader(Source.fromBytes(i._2.getBytes()))
|
||
|
new PMParser(xml)
|
||
|
|
||
|
} ))
|
||
|
|
||
|
ds.write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset")
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
}
|
||
|
}
|