simple test DAG

This commit is contained in:
Giambattista Bloisi 2024-03-10 12:58:45 +01:00
parent 7cfae9f1bc
commit e81e28f5f9
1 changed files with 31 additions and 4 deletions

View File

@ -51,9 +51,37 @@ default_args = {
'max_active_runs': 1,
'retries': 3
}
# [END default_args]
# [START instantiate_dag]
spec = {'apiVersion': 'sparkoperator.k8s.io/v1beta2',
'kind': 'SparkApplication',
'metadata': {
'name': 'spark-pi',
'namespace': 'lot1-spark-jobs'
},
'spec': {
'type': 'Scala',
'mode': 'cluster',
'image': 'apache/spark:v3.1.3',
'imagePullPolicy': 'Always',
'mainApplicationFile': 'local:///opt/spark/examples/jars/spark-examples_2.12-3.1.3.jar',
'mainClass': 'org.apache.spark.examples.SparkPi',
'sparkVersion': '3.1.3',
'restartPolicy': {'type': 'Never'},
'arguments': ['{{ds}}'],
'driver': {
'coreLimit': '1200m',
'cores': 1,
'labels': {'version': '3.1.3'},
'memory': '1g',
'serviceAccount': 'spark',
},
'executor': {
'cores': 1,
'instances': 1,
'memory': '512mg',
'labels': {'version': '3.1.3'}
}
}}
dag = DAG(
'spark_pi',
@ -65,14 +93,13 @@ dag = DAG(
submit = SparkKubernetesOperator(
task_id='spark_pi_submit',
namespace='lot1-spark-jobs',
application_file="/example_spark_kubernetes_operator_pi.yaml",
template_spec=spec,
kubernetes_conn_id="kubernetes_default",
do_xcom_push=True,
in_cluster=True,
delete_on_termination=True,
dag=dag
)
submit.application_file = "/example_spark_kubernetes_operator_pi.yaml"
sensor = SparkKubernetesSensor(
task_id='spark_pi_monitor',