diff --git a/airflow/dags/run_spark.py b/airflow/dags/run_spark.py new file mode 100644 index 0000000..304a2ff --- /dev/null +++ b/airflow/dags/run_spark.py @@ -0,0 +1,113 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +This is an example DAG which uses SparkKubernetesOperator and SparkKubernetesSensor. +In this example, we create two tasks which execute sequentially. +The first task is to submit sparkApplication on Kubernetes cluster(the example uses spark-pi application). +and the second task is to check the final state of the sparkApplication that submitted in the first state. + +Spark-on-k8s operator is required to be already installed on Kubernetes +https://github.com/GoogleCloudPlatform/spark-on-k8s-operator +""" + +from os import path +from datetime import timedelta, datetime + +# [START import_module] +# The DAG object; we'll need this to instantiate a DAG +from airflow import DAG +# Operators; we need this to operate! +from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator +from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor +from airflow.utils.dates import days_ago + +# [END import_module] + +# [START default_args] +# These args will get passed on to each operator +# You can override them on a per-task basis during operator initialization +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': days_ago(1), + 'email': ['airflow@example.com'], + 'email_on_failure': False, + 'email_on_retry': False, + 'max_active_runs': 1, + 'retries': 3 +} + +spec = {'apiVersion': 'sparkoperator.k8s.io/v1beta2', + 'kind': 'SparkApplication', + 'metadata': { + 'name': 'spark-pi-{{ ds }}-{{ task_instance.try_number }}', + 'namespace': 'dnet-spark-jobs' + }, + 'spec': { + 'type': 'Scala', + 'mode': 'cluster', + 'image': 'dnet-spark:1.0.0', + 'imagePullPolicy': 'IfNotPresent', + 'mainApplicationFile': 'local:///opt/spark/examples/jars/spark-examples_2.12-3.1.3.jar', + 'mainClass': 'org.apache.spark.examples.SparkPi', + 'sparkVersion': '3.5.1', + 'restartPolicy': {'type': 'Never'}, +# 'arguments': ['{{ds}}'], + 'driver': { + 'coreLimit': '1200m', + 'cores': 1, + 'labels': {'version': '3.1.3'}, + 'memory': '1g', + 'serviceAccount': 'spark', + }, + 'executor': { + 'cores': 1, + 'instances': 1, + 'memory': '512m', + 'labels': {'version': '3.1.3'} + } + }} + +dag = DAG( + 'spark_pi', + default_args=default_args, + schedule_interval=None, + tags=['example', 'spark'] +) + +submit = SparkKubernetesOperator( + task_id='spark_pi_submit', + namespace='lot1-spark-jobs', + template_spec=spec, + kubernetes_conn_id="kubernetes_default", +# do_xcom_push=True, +# delete_on_termination=True, + base_container_name="spark-kubernetes-driver", + dag=dag +) + +# sensor = SparkKubernetesSensor( +# task_id='spark_pi_monitor', +# namespace='lot1-spark-jobs', +# application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}", +# kubernetes_conn_id="kubernetes_default", +# dag=dag, +# attach_log=False +# ) + +submit \ No newline at end of file