87 lines
4.1 KiB
Scala
87 lines
4.1 KiB
Scala
package eu.dnetlib.dhp.sx.ebi
|
|
|
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
|
import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset}
|
|
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
|
|
import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser}
|
|
import eu.dnetlib.scholexplorer.relation.RelationMapper
|
|
import org.apache.commons.io.IOUtils
|
|
import org.apache.spark.SparkConf
|
|
import org.apache.spark.rdd.RDD
|
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
|
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
|
|
import org.slf4j.{Logger, LoggerFactory}
|
|
|
|
import scala.collection.JavaConverters._
|
|
|
|
object SparkCreateEBIDataFrame {
|
|
|
|
|
|
def main(args: Array[String]): Unit = {
|
|
val logger: Logger = LoggerFactory.getLogger(SparkCreateEBIDataFrame.getClass)
|
|
val conf: SparkConf = new SparkConf()
|
|
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateEBIDataFrame.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/ebi/ebi_to_df_params.json")))
|
|
parser.parseArgument(args)
|
|
val spark: SparkSession =
|
|
SparkSession
|
|
.builder()
|
|
.config(conf)
|
|
.appName(SparkCreateEBIDataFrame.getClass.getSimpleName)
|
|
.master(parser.get("master")).getOrCreate()
|
|
|
|
val sc = spark.sparkContext
|
|
|
|
|
|
val workingPath = parser.get("workingPath")
|
|
val relationMapper = RelationMapper.load
|
|
|
|
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
|
|
implicit val datasetEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset])
|
|
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication])
|
|
implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
|
|
|
|
// logger.info("Extract Publication and relation from publication_xml")
|
|
// val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s =>
|
|
// {
|
|
// new ObjectMapper().readValue(s, classOf[String])
|
|
// }).flatMap(s => {
|
|
// val d = new PublicationScholexplorerParser
|
|
// d.parseObject(s, relationMapper).asScala.iterator})
|
|
//
|
|
// val mapper = new ObjectMapper()
|
|
// mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
|
|
// spark.createDataset(oafPubsRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/oaf")
|
|
//
|
|
// logger.info("Extract Publication and relation from dataset_xml")
|
|
// val oafDatsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/dataset_xml").map(s =>
|
|
// {
|
|
// new ObjectMapper().readValue(s, classOf[String])
|
|
// }).flatMap(s => {
|
|
// val d = new DatasetScholexplorerParser
|
|
// d.parseObject(s, relationMapper).asScala.iterator})
|
|
|
|
// spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf")
|
|
val dataset: Dataset[DLIDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIDataset]).map(d => d.asInstanceOf[DLIDataset])
|
|
val publication: Dataset[DLIPublication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIPublication]).map(d => d.asInstanceOf[DLIPublication])
|
|
val relations: Dataset[Relation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Relation]).map(d => d.asInstanceOf[Relation])
|
|
publication.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
|
|
.groupByKey(_._1)(Encoders.STRING)
|
|
.agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
|
|
.map(p => p._2)
|
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/publication")
|
|
|
|
dataset.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datasetEncoder))
|
|
.groupByKey(_._1)(Encoders.STRING)
|
|
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
|
|
.map(p => p._2)
|
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/dataset")
|
|
|
|
relations.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder))
|
|
.groupByKey(_._1)(Encoders.STRING)
|
|
.agg(EBIAggregator.getRelationAggregator().toColumn)
|
|
.map(p => p._2)
|
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/relation")
|
|
|
|
}
|
|
}
|