From d1afcd4395efa05b771d05225f13dc0f01929c84 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Wed, 16 Oct 2024 14:08:00 +0200 Subject: [PATCH] fixed import --- main.tf | 3 +- variables.tf | 4 ++ workflow/dnet/run_spark.py | 124 ++++++++++++++++++++++++++++++++++++- 3 files changed, 129 insertions(+), 2 deletions(-) diff --git a/main.tf b/main.tf index 20a2ffd..4456f38 100644 --- a/main.tf +++ b/main.tf @@ -18,6 +18,7 @@ module "airflow" { s3_endpoint = var.s3_endpoint s3_key = var.s3_key s3_secret = var.s3_secret - branch_name= var.dag_branch_name + branch_name = var.dag_branch_name + dag_path= var.dag_path_name } diff --git a/variables.tf b/variables.tf index f624ad3..8390108 100644 --- a/variables.tf +++ b/variables.tf @@ -58,3 +58,7 @@ variable "dag_branch_name" { default = "master" } +variable "dag_path_name" { + default = "workflow/dnet" +} + diff --git a/workflow/dnet/run_spark.py b/workflow/dnet/run_spark.py index 87c54bd..3a2530a 100644 --- a/workflow/dnet/run_spark.py +++ b/workflow/dnet/run_spark.py @@ -27,7 +27,7 @@ https://github.com/GoogleCloudPlatform/spark-on-k8s-operator from os import path from datetime import timedelta, datetime -from workflow.dnet.spark_configurator import SparkConfigurator +# from workflow.dnet.spark_configurator import SparkConfigurator # [START import_module] # The DAG object; we'll need this to instantiate a DAG @@ -40,6 +40,128 @@ from airflow.utils.dates import days_ago # [END import_module] + +class SparkConfigurator: + def __init__(self, + name, + mainClass, + jarLocation:str, + arguments, + apiVersion=None, + namespace="dnet-spark-jobs", + image= "dnet-spark:1.0.0", + driver_cores=1, + driver_memory='1G', + executor_cores=1, + executor_memory="1G", + executor_memoryOverhead= "1G", + executor_instances=1 + ) -> None: + if apiVersion: + self.apiVersion = apiVersion + else: + self.apiVersion = "sparkoperator.k8s.io/v1beta2" + self.namespace= namespace + self.name = name + self.image= image + self.mainClass = mainClass + self.jarLocation = jarLocation + self.arguments= arguments + self.s3Configuration = { + "spark.driver.extraJavaOptions": "-Divy.cache.dir=/tmp -Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true", + "spark.executor.extraJavaOptions": "-Divy.cache.dir=/tmp -Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true", + "spark.hadoop.fs.defaultFS": "s3a://spark", + "spark.hadoop.fs.s3a.access.key": "minio", + "spark.hadoop.fs.s3a.secret.key": "minio123", + "spark.hadoop.fs.s3a.endpoint": "https://minio.dnet-minio-tenant.svc.cluster.local", + "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.hadoop.fs.s3a.path.style.access": "true", + "spark.hadoop.fs.s3a.attempts.maximum": "1", + "spark.hadoop.fs.s3a.connection.establish.timeout": "5000", + "spark.hadoop.fs.s3a.connection.timeout": "10001", + "spark.hadoop.fs.s3a.connection.ssl.enabled": "false", + "com.amazonaws.sdk.disableCertChecking": "true", + "com.cloudera.com.amazonaws.sdk.disableCertChecking": "true", + "fs.s3a.connection.ssl.strictverify": "false", + "fs.s3a.connection.ssl.enabled": "false", + "fs.s3a.ssl.enabled": "false", + "spark.hadoop.fs.s3a.ssl.enabled": "false" + } + self.sparkResoruceConf= { + 'driver_cores':driver_cores, + 'driver_memory':driver_memory, + 'executor_cores':executor_cores, + 'executor_memory':executor_memory, + 'executor_instances':executor_instances, + 'memoryOverhead':executor_memoryOverhead + + } + + def get_configuration(self) -> dict: + return { + "apiVersion": self.apiVersion, + "kind": "SparkApplication", + "metadata": { + "name": self.name, + "namespace": self.namespace + }, + "spec": { + "type": "Scala", + "mode": "cluster", + "image":self.image, + "imagePullPolicy": "IfNotPresent", + "mainClass": self.mainClass, + "mainApplicationFile": self.jarLocation, + "arguments": self.arguments, + "sparkVersion": "3.5.1", + "sparkConf": self.s3Configuration, + "restartPolicy": { + "type": "Never" + }, + "volumes": [ + { + "name": "test-volume", + "persistentVolumeClaim": { + "claimName": "my-spark-pvc-tmp" + } + } + ], + "driver": { + "javaOptions": "-Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true", + "cores": self.sparkResoruceConf['driver_cores'], + "coreLimit": "1200m", + "memory": self.sparkResoruceConf['driver_memory'], + "labels": { + "version": "3.5.1" + }, + "serviceAccount": "spark", + "volumeMounts": [ + { + "name": "test-volume", + "mountPath": "/tmp" + } + ] + }, + "executor": { + "javaOptions": "-Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true", + "cores": self.sparkResoruceConf['executor_cores'], + "memoryOverhead": self.sparkResoruceConf['memoryOverhead'], + "memory": self.sparkResoruceConf['executor_memory'], + "instances": self.sparkResoruceConf['executor_instances'], + "labels": { + "version": "3.5.1" + }, + "volumeMounts": [ + { + "name": "test-volume", + "mountPath": "/tmp" + } + ] + } + } + } + + # [START default_args] # These args will get passed on to each operator # You can override them on a per-task basis during operator initialization