2021-10-26 09:40:47 +02:00
|
|
|
package eu.dnetlib.dhp.oa.graph.raw
|
|
|
|
|
2022-01-03 17:25:26 +01:00
|
|
|
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
|
2021-10-26 09:40:47 +02:00
|
|
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
|
|
|
import eu.dnetlib.dhp.common.HdfsSupport
|
|
|
|
import eu.dnetlib.dhp.schema.common.ModelSupport
|
|
|
|
import eu.dnetlib.dhp.schema.oaf.Oaf
|
|
|
|
import eu.dnetlib.dhp.utils.DHPUtils
|
|
|
|
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
|
|
|
|
import org.apache.spark.{SparkConf, SparkContext}
|
2022-01-03 17:25:26 +01:00
|
|
|
import org.json4s.DefaultFormats
|
|
|
|
import org.json4s.jackson.JsonMethods.parse
|
2021-10-26 09:40:47 +02:00
|
|
|
import org.slf4j.LoggerFactory
|
2022-01-03 17:25:26 +01:00
|
|
|
|
2021-10-26 09:40:47 +02:00
|
|
|
import scala.collection.JavaConverters._
|
|
|
|
import scala.io.Source
|
|
|
|
|
|
|
|
object CopyHdfsOafSparkApplication {
|
|
|
|
|
|
|
|
def main(args: Array[String]): Unit = {
|
|
|
|
val log = LoggerFactory.getLogger(getClass)
|
|
|
|
val conf = new SparkConf()
|
|
|
|
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/copy_hdfs_oaf_parameters.json")).mkString)
|
|
|
|
parser.parseArgument(args)
|
|
|
|
|
|
|
|
val spark =
|
|
|
|
SparkSession
|
|
|
|
.builder()
|
|
|
|
.config(conf)
|
|
|
|
.appName(getClass.getSimpleName)
|
|
|
|
.master(parser.get("master")).getOrCreate()
|
|
|
|
|
|
|
|
val sc: SparkContext = spark.sparkContext
|
|
|
|
|
|
|
|
val mdstoreManagerUrl = parser.get("mdstoreManagerUrl")
|
|
|
|
log.info("mdstoreManagerUrl: {}", mdstoreManagerUrl)
|
|
|
|
|
|
|
|
val mdFormat = parser.get("mdFormat")
|
|
|
|
log.info("mdFormat: {}", mdFormat)
|
|
|
|
|
|
|
|
val mdLayout = parser.get("mdLayout")
|
|
|
|
log.info("mdLayout: {}", mdLayout)
|
|
|
|
|
|
|
|
val mdInterpretation = parser.get("mdInterpretation")
|
|
|
|
log.info("mdInterpretation: {}", mdInterpretation)
|
|
|
|
|
|
|
|
val hdfsPath = parser.get("hdfsPath")
|
|
|
|
log.info("hdfsPath: {}", hdfsPath)
|
|
|
|
|
|
|
|
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
|
2022-01-03 17:25:26 +01:00
|
|
|
import spark.implicits._
|
2021-10-26 09:40:47 +02:00
|
|
|
|
|
|
|
val paths = DHPUtils.mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation, true).asScala
|
|
|
|
|
|
|
|
val validPaths: List[String] = paths.filter(p => HdfsSupport.exists(p, sc.hadoopConfiguration)).toList
|
|
|
|
|
|
|
|
if (validPaths.nonEmpty) {
|
2022-01-03 17:25:26 +01:00
|
|
|
val oaf = spark.read.load(validPaths: _*).as[String]
|
|
|
|
val mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
|
|
|
|
val l = ModelSupport.oafTypes.entrySet.asScala.toList
|
2021-10-26 09:40:47 +02:00
|
|
|
l.foreach(
|
|
|
|
e =>
|
2022-01-03 17:25:26 +01:00
|
|
|
oaf
|
|
|
|
.filter(o => isOafType(o, e.getKey))
|
|
|
|
.map(j => mapper.readValue(j, e.getValue).asInstanceOf[Oaf])
|
2021-10-26 09:40:47 +02:00
|
|
|
.map(s => mapper.writeValueAsString(s))(Encoders.STRING)
|
|
|
|
.write
|
|
|
|
.option("compression", "gzip")
|
|
|
|
.mode(SaveMode.Append)
|
|
|
|
.text(s"$hdfsPath/${e}")
|
|
|
|
)
|
|
|
|
}
|
|
|
|
}
|
2022-01-03 17:25:26 +01:00
|
|
|
|
|
|
|
def isOafType(input: String, oafType: String): Boolean = {
|
|
|
|
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
|
|
|
lazy val json: org.json4s.JValue = parse(input)
|
|
|
|
if (oafType == "relation") {
|
|
|
|
val hasSource = (json \ "source").extractOrElse[String](null)
|
|
|
|
val hasTarget = (json \ "target").extractOrElse[String](null)
|
|
|
|
|
|
|
|
hasSource != null && hasTarget != null
|
|
|
|
} else {
|
|
|
|
val hasId = (json \ "id").extractOrElse[String](null)
|
|
|
|
val resultType = (json \ "resulttype" \ "classid").extractOrElse[String](null)
|
|
|
|
hasId != null && oafType.equalsIgnoreCase(resultType)
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
2021-10-26 09:40:47 +02:00
|
|
|
}
|