OAF-store-graph mdstores: firther fix for PR#180
This commit is contained in:
parent
3bd3653be9
commit
8ae46ca789
|
@ -48,26 +48,27 @@ object CopyHdfsOafSparkApplication {
|
|||
log.info("hdfsPath: {}", hdfsPath)
|
||||
|
||||
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||
import spark.implicits._
|
||||
|
||||
val paths = DHPUtils.mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation, true).asScala
|
||||
|
||||
val validPaths: List[String] = paths.filter(p => HdfsSupport.exists(p, sc.hadoopConfiguration)).toList
|
||||
|
||||
val types = ModelSupport.oafTypes.entrySet
|
||||
.asScala
|
||||
.map(e => Tuple2(e.getKey, e.getValue))
|
||||
|
||||
if (validPaths.nonEmpty) {
|
||||
val oaf = spark.read.load(validPaths: _*).as[String]
|
||||
val oaf = spark.read.textFile(validPaths: _*)
|
||||
val mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
|
||||
val l = ModelSupport.oafTypes.entrySet.asScala.toList
|
||||
l.foreach(
|
||||
e =>
|
||||
oaf
|
||||
.filter(o => isOafType(o, e.getKey))
|
||||
.map(j => mapper.readValue(j, e.getValue).asInstanceOf[Oaf])
|
||||
.map(s => mapper.writeValueAsString(s))(Encoders.STRING)
|
||||
.write
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Append)
|
||||
.text(s"$hdfsPath/${e}")
|
||||
|
||||
types.foreach(t => oaf
|
||||
.filter(o => isOafType(o, t._1))
|
||||
.map(j => mapper.readValue(j, t._2).asInstanceOf[Oaf])
|
||||
.map(s => mapper.writeValueAsString(s))(Encoders.STRING)
|
||||
.write
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Append)
|
||||
.text(s"$hdfsPath/${t._1}")
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue