diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala index 0272cb1a6..57acaf404 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala @@ -4,9 +4,9 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset} import org.apache.commons.io.IOUtils -import org.apache.hadoop.io.{IntWritable, Text} + import org.apache.spark.SparkConf -import org.apache.spark.rdd.RDD + import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} @@ -17,6 +17,7 @@ object SparkMapDumpIntoOAF { def main(args: Array[String]): Unit = { + implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT] val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass) val conf: SparkConf = new SparkConf() @@ -35,7 +36,6 @@ object SparkMapDumpIntoOAF { implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.kryo[OafDataset] val targetPath = parser.get("targetPath") - import spark.implicits._ spark.read.load(parser.get("sourcePath")).as[CrossrefDT] .flatMap(k => Crossref2Oaf.convert(k.json))