From 908294d86e03a199c2f3e9db98fcdb689c66db48 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 5 Jan 2022 15:49:05 +0100 Subject: [PATCH] OAF-store-graph mdstores: firther fix for PR#180 --- .../raw/CopyHdfsOafSparkApplication.scala | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala index 1376c6b35..fa13f477c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/oa/graph/raw/CopyHdfsOafSparkApplication.scala @@ -48,26 +48,27 @@ object CopyHdfsOafSparkApplication { log.info("hdfsPath: {}", hdfsPath) implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] - import spark.implicits._ val paths = DHPUtils.mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation, true).asScala val validPaths: List[String] = paths.filter(p => HdfsSupport.exists(p, sc.hadoopConfiguration)).toList + val types = ModelSupport.oafTypes.entrySet + .asScala + .map(e => Tuple2(e.getKey, e.getValue)) + if (validPaths.nonEmpty) { - val oaf = spark.read.load(validPaths: _*).as[String] + val oaf = spark.read.textFile(validPaths: _*) val mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) - val l = ModelSupport.oafTypes.entrySet.asScala.toList - l.foreach( - e => - oaf - .filter(o => isOafType(o, e.getKey)) - .map(j => mapper.readValue(j, e.getValue).asInstanceOf[Oaf]) - .map(s => mapper.writeValueAsString(s))(Encoders.STRING) - .write - .option("compression", "gzip") - .mode(SaveMode.Append) - .text(s"$hdfsPath/${e}") + + types.foreach(t => oaf + .filter(o => isOafType(o, t._1)) + .map(j => mapper.readValue(j, t._2).asInstanceOf[Oaf]) + .map(s => mapper.writeValueAsString(s))(Encoders.STRING) + .write + .option("compression", "gzip") + .mode(SaveMode.Append) + .text(s"$hdfsPath/${t._1}") ) } }