diff --git a/airflow/dags/example_spark_kubernetes_operator_pi.yaml b/airflow/dags/example_spark_kubernetes_operator_pi.yaml new file mode 100644 index 0000000..9cda323 --- /dev/null +++ b/airflow/dags/example_spark_kubernetes_operator_pi.yaml @@ -0,0 +1,54 @@ +# +# Copyright 2017 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: "sparkoperator.k8s.io/v1beta2" +kind: SparkApplication +metadata: + name: spark-pi + namespace: spark-jobs +spec: + type: Scala + mode: cluster + image: "apache/spark:v3.1.3" + imagePullPolicy: Always + mainClass: org.apache.spark.examples.SparkPi + mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.3.jar" + sparkVersion: "3.1.3" + restartPolicy: + type: Never + volumes: + - name: "test-volume" + hostPath: + path: "/tmp" + type: Directory + driver: + cores: 1 + coreLimit: "1200m" + memory: "512m" + labels: + version: 3.1.3 + serviceAccount: spark + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" + executor: + cores: 1 + instances: 1 + memory: "512m" + labels: + version: 3.1.3 + volumeMounts: + - name: "test-volume" + mountPath: "/tmp" \ No newline at end of file diff --git a/airflow/dags/skg_if_pipeline.py b/airflow/dags/skg_if_pipeline.py index 35fd214..f6ad045 100644 --- a/airflow/dags/skg_if_pipeline.py +++ b/airflow/dags/skg_if_pipeline.py @@ -11,7 +11,6 @@ import pendulum import requests from airflow.decorators import dag from airflow.decorators import task -from airflow.models.baseoperator import chain from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.utils.file import TemporaryDirectory @@ -49,7 +48,8 @@ def skg_if_pipeline(): with TemporaryDirectory() as dwl_dir: with TemporaryDirectory() as tmp_dir: archive = f'{dwl_dir}/{key}' - hook.download_file(key=key, bucket_name=bucket, local_path=dwl_dir, preserve_file_name=True, use_autogenerated_subdir=False) + hook.download_file(key=key, bucket_name=bucket, local_path=dwl_dir, preserve_file_name=True, + use_autogenerated_subdir=False) with zipfile.ZipFile(archive, 'r') as zip_ref: zip_ref.extractall(tmp_dir) @@ -58,7 +58,8 @@ def skg_if_pipeline(): if file == key: continue local_file_path = os.path.join(root, file) - hook.load_file(local_file_path, strip_prefix(local_file_path, tmp_dir), S3_BUCKET_NAME, replace=True) + hook.load_file(local_file_path, strip_prefix(local_file_path, tmp_dir), S3_BUCKET_NAME, + replace=True) return "" @task @@ -90,12 +91,15 @@ def skg_if_pipeline(): buff = io.BufferedReader(gzipfile) for line in buff: data = json.loads(line) - session.post(f'https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200/{entity}_index/_doc/' + requests.utils.quote(data['local_identifier'], safe='') + "?refresh=false", - data=json.dumps(data), - headers={"Content-Type": "application/json"}, - verify=False) - session.post(f'https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200/{entity}_index/_refresh', - verify=False) + session.post( + f'https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200/{entity}_index/_doc/' + requests.utils.quote( + data['local_identifier'], safe='') + "?refresh=false", + data=json.dumps(data), + headers={"Content-Type": "application/json"}, + verify=False) + session.post( + f'https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200/{entity}_index/_refresh', + verify=False) unzip_to_s3("dump.zip", S3_BUCKET_NAME) bulk_load("datasource") @@ -106,5 +110,5 @@ def skg_if_pipeline(): bulk_load("topic") bulk_load("venues") -skg_if_pipeline() +skg_if_pipeline() diff --git a/airflow/dags/spark_pi.py b/airflow/dags/spark_pi.py new file mode 100644 index 0000000..e66d5c8 --- /dev/null +++ b/airflow/dags/spark_pi.py @@ -0,0 +1,83 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +This is an example DAG which uses SparkKubernetesOperator and SparkKubernetesSensor. +In this example, we create two tasks which execute sequentially. +The first task is to submit sparkApplication on Kubernetes cluster(the example uses spark-pi application). +and the second task is to check the final state of the sparkApplication that submitted in the first state. + +Spark-on-k8s operator is required to be already installed on Kubernetes +https://github.com/GoogleCloudPlatform/spark-on-k8s-operator +""" + +from os import path +from datetime import timedelta, datetime + +# [START import_module] +# The DAG object; we'll need this to instantiate a DAG +from airflow import DAG +# Operators; we need this to operate! +from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator +from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor +from airflow.utils.dates import days_ago + +# [END import_module] + +# [START default_args] +# These args will get passed on to each operator +# You can override them on a per-task basis during operator initialization +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': days_ago(1), + 'email': ['airflow@example.com'], + 'email_on_failure': False, + 'email_on_retry': False, + 'max_active_runs': 1, + 'retries': 3 +} +# [END default_args] + +# [START instantiate_dag] + +dag = DAG( + 'spark_pi', + default_args=default_args, + schedule_interval=None, + tags=['example', 'spark'] +) + +submit = SparkKubernetesOperator( + task_id='spark_pi_submit', + namespace='spark-jobs', + application_file="example_spark_kubernetes_operator_pi.yaml", + kubernetes_conn_id="kubernetes_default", + do_xcom_push=True, + dag=dag, +) + +sensor = SparkKubernetesSensor( + task_id='spark_pi_monitor', + namespace='spark-jobs', + application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}", + kubernetes_conn_id="kubernetes_default", + dag=dag, + attach_log=True +) + +submit >> sensor \ No newline at end of file