From 4cd6852ad92e4aadff2a66ca5c9f236afccacf6e Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 2 May 2024 13:53:57 +0200 Subject: [PATCH] minor fix --- airflow/dags/run_spark.py | 112 +++++--------------------------------- 1 file changed, 15 insertions(+), 97 deletions(-) diff --git a/airflow/dags/run_spark.py b/airflow/dags/run_spark.py index c217970..a3c1290 100644 --- a/airflow/dags/run_spark.py +++ b/airflow/dags/run_spark.py @@ -27,6 +27,7 @@ https://github.com/GoogleCloudPlatform/spark-on-k8s-operator from os import path from datetime import timedelta, datetime +from spark_configurator import SparkConfigurator # [START import_module] # The DAG object; we'll need this to instantiate a DAG @@ -52,92 +53,16 @@ default_args = { 'retries': 3 } -spec = { - "apiVersion": "sparkoperator.k8s.io/v1beta2", - "kind": "SparkApplication", - "metadata": { - "name": "spark-scholix", - "namespace": "dnet-spark-jobs" - }, - "spec": { - "type": "Scala", - "mode": "cluster", - "image": "dnet-spark:1.0.0", - "imagePullPolicy": "IfNotPresent", - "mainClass": "eu.dnetlib.dhp.sx.graph.SparkCreateScholexplorerDump", - "mainApplicationFile": "s3a://deps/dhp-shade-package-1.2.5-SNAPSHOT.jar", - "arguments": [ - "--sourcePath", - "s3a://raw-graph/01", - "--targetPath", - "s3a://scholix" - ], - "sparkVersion": "3.5.1", - "sparkConf": { - "spark.driver.extraJavaOptions": "-Divy.cache.dir=/tmp -Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true", - "spark.executor.extraJavaOptions": "-Divy.cache.dir=/tmp -Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true", - "spark.hadoop.fs.defaultFS": "s3a://scholix", - "spark.hadoop.fs.s3a.access.key": "minio", - "spark.hadoop.fs.s3a.secret.key": "minio123", - "spark.hadoop.fs.s3a.endpoint": "https://minio.dnet-minio-tenant.svc.cluster.local", - "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", - "spark.hadoop.fs.s3a.path.style.access": "true", - "spark.hadoop.fs.s3a.attempts.maximum": "1", - "spark.hadoop.fs.s3a.connection.establish.timeout": "5000", - "spark.hadoop.fs.s3a.connection.timeout": "10001", - "spark.hadoop.fs.s3a.connection.ssl.enabled": "false", - "com.amazonaws.sdk.disableCertChecking": "true", - "com.cloudera.com.amazonaws.sdk.disableCertChecking": "true", - "fs.s3a.connection.ssl.strictverify": "false", - "fs.s3a.connection.ssl.enabled": "false", - "fs.s3a.ssl.enabled": "false", - "spark.hadoop.fs.s3a.ssl.enabled": "false" - }, - "restartPolicy": { - "type": "Never" - }, - "volumes": [ - { - "name": "test-volume", - "persistentVolumeClaim": { - "claimName": "my-spark-pvc-tmp" - } - } - ], - "driver": { - "javaOptions": "-Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true", - "cores": 1, - "coreLimit": "1200m", - "memory": "2G", - "labels": { - "version": "3.5.1" - }, - "serviceAccount": "spark", - "volumeMounts": [ - { - "name": "test-volume", - "mountPath": "/tmp" - } - ] - }, - "executor": { - "javaOptions": "-Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true", - "cores": 10, - "memoryOverhead": "3G", - "memory": "4G", - "instances": 1, - "labels": { - "version": "3.5.1" - }, - "volumeMounts": [ - { - "name": "test-volume", - "mountPath": "/tmp" - } - ] - } - } -} +spec =SparkConfigurator( + name="spark-scholix", + mainClass="eu.dnetlib.dhp.sx.graph.SparkCreateScholexplorerDump", + jarLocation="s3a://deps/dhp-shade-package-1.2.5-SNAPSHOT.jar", + arguments =[ "--sourcePath", "s3a://raw-graph/01", "--targetPath", "s3a://scholix"], + executor_cores=10, + executor_memory="3G", + executor_instances=1, + executor_memoryOverhead="3G" +).get_configuration() dag = DAG( 'spark_scholix', @@ -151,19 +76,12 @@ submit = SparkKubernetesOperator( namespace='dnet-spark-jobs', template_spec=spec, kubernetes_conn_id="kubernetes_default", -# do_xcom_push=True, -# delete_on_termination=True, + # do_xcom_push=True, + # delete_on_termination=True, base_container_name="spark-kubernetes-driver", dag=dag ) -# sensor = SparkKubernetesSensor( -# task_id='spark_pi_monitor', -# namespace='lot1-spark-jobs', -# application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}", -# kubernetes_conn_id="kubernetes_default", -# dag=dag, -# attach_log=False -# ) -submit \ No newline at end of file + +submit