diff --git a/airflow/dags/spark_pi.py b/airflow/dags/spark_pi.py index 86781a7..a691d18 100644 --- a/airflow/dags/spark_pi.py +++ b/airflow/dags/spark_pi.py @@ -51,9 +51,37 @@ default_args = { 'max_active_runs': 1, 'retries': 3 } -# [END default_args] -# [START instantiate_dag] +spec = {'apiVersion': 'sparkoperator.k8s.io/v1beta2', + 'kind': 'SparkApplication', + 'metadata': { + 'name': 'spark-pi', + 'namespace': 'lot1-spark-jobs' + }, + 'spec': { + 'type': 'Scala', + 'mode': 'cluster', + 'image': 'apache/spark:v3.1.3', + 'imagePullPolicy': 'Always', + 'mainApplicationFile': 'local:///opt/spark/examples/jars/spark-examples_2.12-3.1.3.jar', + 'mainClass': 'org.apache.spark.examples.SparkPi', + 'sparkVersion': '3.1.3', + 'restartPolicy': {'type': 'Never'}, + 'arguments': ['{{ds}}'], + 'driver': { + 'coreLimit': '1200m', + 'cores': 1, + 'labels': {'version': '3.1.3'}, + 'memory': '1g', + 'serviceAccount': 'spark', + }, + 'executor': { + 'cores': 1, + 'instances': 1, + 'memory': '512mg', + 'labels': {'version': '3.1.3'} + } + }} dag = DAG( 'spark_pi', @@ -65,14 +93,13 @@ dag = DAG( submit = SparkKubernetesOperator( task_id='spark_pi_submit', namespace='lot1-spark-jobs', - application_file="/example_spark_kubernetes_operator_pi.yaml", + template_spec=spec, kubernetes_conn_id="kubernetes_default", do_xcom_push=True, in_cluster=True, delete_on_termination=True, dag=dag ) -submit.application_file = "/example_spark_kubernetes_operator_pi.yaml" sensor = SparkKubernetesSensor( task_id='spark_pi_monitor',