diff --git a/airflow/dags/spark_configurator.py b/airflow/dags/spark_configurator.py new file mode 100644 index 0000000..c2794f5 --- /dev/null +++ b/airflow/dags/spark_configurator.py @@ -0,0 +1,119 @@ +class SparkConfigurator: + def __init__(self, + name, + mainClass, + jarLocation, + arguments, + apiVersion=None, + namespace="dnet-spark-jobs", + image= "dnet-spark:1.0.0", + driver_cores=1, + driver_memory='1G', + executor_cores=1, + executor_memory="1G", + executor_memoryOverhead= "1G", + executor_instances=1 + ) -> None: + if apiVersion: + self.apiVersion = apiVersion + else: + self.apiVersion = "sparkoperator.k8s.io/v1beta2" + self.namespace= namespace + self.name = name + self.image= image + self.mainClass = mainClass + self.jarLocation = jarLocation, + self.arguments= arguments + self.s3Configuration = { + "spark.driver.extraJavaOptions": "-Divy.cache.dir=/tmp -Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true", + "spark.executor.extraJavaOptions": "-Divy.cache.dir=/tmp -Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true", + "spark.hadoop.fs.defaultFS": "s3a://spark", + "spark.hadoop.fs.s3a.access.key": "minio", + "spark.hadoop.fs.s3a.secret.key": "minio123", + "spark.hadoop.fs.s3a.endpoint": "https://minio.dnet-minio-tenant.svc.cluster.local", + "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.hadoop.fs.s3a.path.style.access": "true", + "spark.hadoop.fs.s3a.attempts.maximum": "1", + "spark.hadoop.fs.s3a.connection.establish.timeout": "5000", + "spark.hadoop.fs.s3a.connection.timeout": "10001", + "spark.hadoop.fs.s3a.connection.ssl.enabled": "false", + "com.amazonaws.sdk.disableCertChecking": "true", + "com.cloudera.com.amazonaws.sdk.disableCertChecking": "true", + "fs.s3a.connection.ssl.strictverify": "false", + "fs.s3a.connection.ssl.enabled": "false", + "fs.s3a.ssl.enabled": "false", + "spark.hadoop.fs.s3a.ssl.enabled": "false" + } + self.sparkResoruceConf= { + 'driver_cores':driver_cores, + 'driver_memory':driver_memory, + 'executor_cores':executor_cores, + 'executor_memory':executor_memory, + 'executor_instances':executor_instances, + 'memoryOverhead':executor_memoryOverhead + + } + + def get_configuration(self) -> dict: + return { + "apiVersion": self.apiVersion, + "kind": "SparkApplication", + "metadata": { + "name": self.name, + "namespace": self.namespace + }, + "spec": { + "type": "Scala", + "mode": "cluster", + "image":self.image, + "imagePullPolicy": "IfNotPresent", + "mainClass": self.mainClass, + "mainApplicationFile": self.mainClass, + "arguments": self.arguments, + "sparkVersion": "3.5.1", + "sparkConf": self.s3Configuration, + "restartPolicy": { + "type": "Never" + }, + "volumes": [ + { + "name": "test-volume", + "persistentVolumeClaim": { + "claimName": "my-spark-pvc-tmp" + } + } + ], + "driver": { + "javaOptions": "-Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true", + "cores": self.sparkResoruceConf['driver_cores'], + "coreLimit": "1200m", + "memory": self.sparkResoruceConf['driver_memory'], + "labels": { + "version": "3.5.1" + }, + "serviceAccount": "spark", + "volumeMounts": [ + { + "name": "test-volume", + "mountPath": "/tmp" + } + ] + }, + "executor": { + "javaOptions": "-Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true", + "cores": self.sparkResoruceConf['executor_cores'], + "memoryOverhead": self.sparkResoruceConf['memoryOverhead'], + "memory": self.sparkResoruceConf['executor_memory'], + "instances": self.sparkResoruceConf['executor_instances'], + "labels": { + "version": "3.5.1" + }, + "volumeMounts": [ + { + "name": "test-volume", + "mountPath": "/tmp" + } + ] + } + } + }