package eu.dnetlib.scholix import com.sandro.app.AbstractScalaApplication import org.apache.spark.sql.SparkSession import org.slf4j.{Logger, LoggerFactory} import org.json4s.DefaultFormats import org.json4s.JsonAST.{JField, JObject, JString} import org.json4s.jackson.JsonMethods.parse import org.apache.spark.sql.functions.{count,desc} class CheckMDStoreContent( args: Array[String], log: Logger) extends AbstractScalaApplication( args: Array[String], log: Logger) { def get_type(input:String):String = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: org.json4s.JValue = parse(input) val source = (json \ "source").extractOrElse[String](null) if (source != null) { val rel =(json \"relClass").extract[String] s"Relation:$rel" } else { val l: List[String] = for { JObject(instance) <- json \\ "instance" JField("instancetype", JObject(instancetype)) <- instance JField("classname", JString(classname)) <- instancetype } yield classname l.head } } def show_typologies(spark:SparkSession, path:String): Unit = { import spark.implicits._ val df = spark.read.text(path).as[String] df.map(s =>get_type(s)).groupBy("value").agg(count("value").alias("Total")).orderBy(desc("Total")).show(300, false) } /** Here all the spark applications runs this method * where the whole logic of the spark node is defined */ override def run(): Unit = { val path = argumentMap("path") log.warn(s"Path is $path") show_typologies(spark, path) } } object CheckMDStoreContent { val log:Logger = LoggerFactory.getLogger(getClass.getName) def main(args: Array[String]): Unit = { new CheckMDStoreContent(args,log).initialize().run() } }