diff --git a/airflow/dags/run_spark.py b/airflow/dags/run_spark.py index 22310b4..c217970 100644 --- a/airflow/dags/run_spark.py +++ b/airflow/dags/run_spark.py @@ -52,46 +52,102 @@ default_args = { 'retries': 3 } -spec = {'apiVersion': 'sparkoperator.k8s.io/v1beta2', - 'kind': 'SparkApplication', - 'metadata': { - 'name': 'spark-pi-{{ ds }}-{{ task_instance.try_number }}', - 'namespace': 'dnet-spark-jobs' +spec = { + "apiVersion": "sparkoperator.k8s.io/v1beta2", + "kind": "SparkApplication", + "metadata": { + "name": "spark-scholix", + "namespace": "dnet-spark-jobs" + }, + "spec": { + "type": "Scala", + "mode": "cluster", + "image": "dnet-spark:1.0.0", + "imagePullPolicy": "IfNotPresent", + "mainClass": "eu.dnetlib.dhp.sx.graph.SparkCreateScholexplorerDump", + "mainApplicationFile": "s3a://deps/dhp-shade-package-1.2.5-SNAPSHOT.jar", + "arguments": [ + "--sourcePath", + "s3a://raw-graph/01", + "--targetPath", + "s3a://scholix" + ], + "sparkVersion": "3.5.1", + "sparkConf": { + "spark.driver.extraJavaOptions": "-Divy.cache.dir=/tmp -Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true", + "spark.executor.extraJavaOptions": "-Divy.cache.dir=/tmp -Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true", + "spark.hadoop.fs.defaultFS": "s3a://scholix", + "spark.hadoop.fs.s3a.access.key": "minio", + "spark.hadoop.fs.s3a.secret.key": "minio123", + "spark.hadoop.fs.s3a.endpoint": "https://minio.dnet-minio-tenant.svc.cluster.local", + "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.hadoop.fs.s3a.path.style.access": "true", + "spark.hadoop.fs.s3a.attempts.maximum": "1", + "spark.hadoop.fs.s3a.connection.establish.timeout": "5000", + "spark.hadoop.fs.s3a.connection.timeout": "10001", + "spark.hadoop.fs.s3a.connection.ssl.enabled": "false", + "com.amazonaws.sdk.disableCertChecking": "true", + "com.cloudera.com.amazonaws.sdk.disableCertChecking": "true", + "fs.s3a.connection.ssl.strictverify": "false", + "fs.s3a.connection.ssl.enabled": "false", + "fs.s3a.ssl.enabled": "false", + "spark.hadoop.fs.s3a.ssl.enabled": "false" }, - 'spec': { - 'type': 'Scala', - 'mode': 'cluster', - 'image': 'dnet-spark:1.0.0', - 'imagePullPolicy': 'IfNotPresent', - 'mainApplicationFile': 'local:///opt/spark/examples/jars/spark-examples_2.12-3.5.1.jar', - 'mainClass': 'org.apache.spark.examples.SparkPi', - 'sparkVersion': '3.5.1', - 'restartPolicy': {'type': 'Never'}, -# 'arguments': ['{{ds}}'], - 'driver': { - 'coreLimit': '1200m', - 'cores': 1, - 'labels': {'version': '3.5.1'}, - 'memory': '1g', - 'serviceAccount': 'spark', - }, - 'executor': { - 'cores': 1, - 'instances': 1, - 'memory': '512m', - 'labels': {'version': '3.5.1'} + "restartPolicy": { + "type": "Never" + }, + "volumes": [ + { + "name": "test-volume", + "persistentVolumeClaim": { + "claimName": "my-spark-pvc-tmp" + } } - }} + ], + "driver": { + "javaOptions": "-Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true", + "cores": 1, + "coreLimit": "1200m", + "memory": "2G", + "labels": { + "version": "3.5.1" + }, + "serviceAccount": "spark", + "volumeMounts": [ + { + "name": "test-volume", + "mountPath": "/tmp" + } + ] + }, + "executor": { + "javaOptions": "-Dcom.amazonaws.sdk.disableCertChecking=true -Dcom.cloudera.com.amazonaws.sdk.disableCertChecking=true", + "cores": 10, + "memoryOverhead": "3G", + "memory": "4G", + "instances": 1, + "labels": { + "version": "3.5.1" + }, + "volumeMounts": [ + { + "name": "test-volume", + "mountPath": "/tmp" + } + ] + } + } +} dag = DAG( - 'spark_pi', + 'spark_scholix', default_args=default_args, schedule_interval=None, tags=['example', 'spark'] ) submit = SparkKubernetesOperator( - task_id='spark_pi_submit', + task_id='spark_scholix_submit', namespace='dnet-spark-jobs', template_spec=spec, kubernetes_conn_id="kubernetes_default",