1
0
Fork 0

added kryo encoding for input files

This commit is contained in:
Miriam Baglioni 2021-06-18 09:42:07 +02:00
parent 3585e53da3
commit 6aca0d8ebb
1 changed files with 3 additions and 3 deletions

View File

@ -4,9 +4,9 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf import eu.dnetlib.dhp.schema.oaf
import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset} import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset}
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
@ -17,6 +17,7 @@ object SparkMapDumpIntoOAF {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT]
val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass) val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
val conf: SparkConf = new SparkConf() val conf: SparkConf = new SparkConf()
@ -35,7 +36,6 @@ object SparkMapDumpIntoOAF {
implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.kryo[OafDataset] implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.kryo[OafDataset]
val targetPath = parser.get("targetPath") val targetPath = parser.get("targetPath")
import spark.implicits._
spark.read.load(parser.get("sourcePath")).as[CrossrefDT] spark.read.load(parser.get("sourcePath")).as[CrossrefDT]
.flatMap(k => Crossref2Oaf.convert(k.json)) .flatMap(k => Crossref2Oaf.convert(k.json))