2021-11-25 10:54:13 +01:00
|
|
|
package eu.dnetlib.dhp.application
|
|
|
|
|
2024-04-11 17:27:49 +02:00
|
|
|
import eu.dnetlib.dhp.common.Constants
|
|
|
|
import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile
|
|
|
|
|
2021-11-25 10:54:13 +01:00
|
|
|
import scala.io.Source
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
/** This is the main Interface SparkApplication
|
|
|
|
* where all the Spark Scala class should inherit
|
|
|
|
*/
|
2021-11-25 10:54:13 +01:00
|
|
|
trait SparkScalaApplication {
|
2022-01-11 16:57:48 +01:00
|
|
|
|
|
|
|
/** This is the path in the classpath of the json
|
|
|
|
* describes all the argument needed to run
|
|
|
|
*/
|
2021-11-25 10:54:13 +01:00
|
|
|
val propertyPath: String
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
/** Utility to parse the arguments using the
|
|
|
|
* property json in the classpath identified from
|
|
|
|
* the variable propertyPath
|
|
|
|
*
|
|
|
|
* @param args the list of arguments
|
|
|
|
*/
|
2021-11-25 10:54:13 +01:00
|
|
|
def parseArguments(args: Array[String]): ArgumentApplicationParser = {
|
2022-01-11 16:57:48 +01:00
|
|
|
val parser = new ArgumentApplicationParser(
|
|
|
|
Source.fromInputStream(getClass.getResourceAsStream(propertyPath)).mkString
|
|
|
|
)
|
2021-11-25 10:54:13 +01:00
|
|
|
parser.parseArgument(args)
|
|
|
|
parser
|
|
|
|
}
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
/** Here all the spark applications runs this method
|
|
|
|
* where the whole logic of the spark node is defined
|
|
|
|
*/
|
2021-11-25 10:54:13 +01:00
|
|
|
def run(): Unit
|
|
|
|
}
|
2021-11-25 13:03:17 +01:00
|
|
|
|
|
|
|
import org.apache.spark.SparkConf
|
|
|
|
import org.apache.spark.sql.SparkSession
|
|
|
|
import org.slf4j.Logger
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
abstract class AbstractScalaApplication(
|
|
|
|
val propertyPath: String,
|
|
|
|
val args: Array[String],
|
|
|
|
log: Logger
|
|
|
|
) extends SparkScalaApplication {
|
2021-11-25 13:03:17 +01:00
|
|
|
|
|
|
|
var parser: ArgumentApplicationParser = null
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
var spark: SparkSession = null
|
2021-11-25 13:03:17 +01:00
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
def initialize(): SparkScalaApplication = {
|
2021-11-25 13:03:17 +01:00
|
|
|
parser = parseArguments(args)
|
|
|
|
spark = createSparkSession()
|
|
|
|
this
|
|
|
|
}
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
/** Utility for creating a spark session starting from parser
|
|
|
|
*
|
|
|
|
* @return a spark Session
|
|
|
|
*/
|
|
|
|
private def createSparkSession(): SparkSession = {
|
|
|
|
require(parser != null)
|
2021-11-25 13:03:17 +01:00
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
val conf: SparkConf = new SparkConf()
|
2021-11-25 13:03:17 +01:00
|
|
|
val master = parser.get("master")
|
|
|
|
log.info(s"Creating Spark session: Master: $master")
|
2022-01-11 16:57:48 +01:00
|
|
|
SparkSession
|
|
|
|
.builder()
|
|
|
|
.config(conf)
|
2021-11-25 13:03:17 +01:00
|
|
|
.appName(getClass.getSimpleName)
|
|
|
|
.master(master)
|
|
|
|
.getOrCreate()
|
|
|
|
}
|
|
|
|
|
2024-04-11 17:27:49 +02:00
|
|
|
def reportTotalSize(targetPath: String, outputBasePath: String): Unit = {
|
|
|
|
val total_items = spark.read.text(targetPath).count()
|
|
|
|
writeHdfsFile(
|
|
|
|
spark.sparkContext.hadoopConfiguration,
|
|
|
|
s"$total_items",
|
|
|
|
outputBasePath + Constants.MDSTORE_SIZE_PATH
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
}
|