From 6aca0d8ebb0eab94befbb16c8807a508b9c1a319 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 18 Jun 2021 09:42:07 +0200 Subject: [PATCH] added kryo encoding for input files --- .../eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala index 0272cb1a6..57acaf404 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala @@ -4,9 +4,9 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset} import org.apache.commons.io.IOUtils -import org.apache.hadoop.io.{IntWritable, Text} + import org.apache.spark.SparkConf -import org.apache.spark.rdd.RDD + import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} @@ -17,6 +17,7 @@ object SparkMapDumpIntoOAF { def main(args: Array[String]): Unit = { + implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT] val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass) val conf: SparkConf = new SparkConf() @@ -35,7 +36,6 @@ object SparkMapDumpIntoOAF { implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.kryo[OafDataset] val targetPath = parser.get("targetPath") - import spark.implicits._ spark.read.load(parser.get("sourcePath")).as[CrossrefDT] .flatMap(k => Crossref2Oaf.convert(k.json))