From 6d3af5e50d5b6ec944c1d0773d932ab8093b3ace Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Wed, 12 Jun 2024 22:56:17 +0200 Subject: [PATCH] initial stage --- airflow/dags/antispam-batch.py | 71 ++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 airflow/dags/antispam-batch.py diff --git a/airflow/dags/antispam-batch.py b/airflow/dags/antispam-batch.py new file mode 100644 index 0000000..8f32e93 --- /dev/null +++ b/airflow/dags/antispam-batch.py @@ -0,0 +1,71 @@ +import os +from datetime import timedelta, datetime + +import pendulum +from airflow import DAG +from airflow.hooks.base import BaseHook +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from airflow.providers.cncf.kubernetes.secret import Secret + +default_args = { + "execution_timeout": timedelta(days=6), + "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)), + "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), +} + +conn = BaseHook.get_connection("opensearch_default") + +dag = DAG( + 'antispam_batch_check', + default_args=default_args, + schedule=None, + dagrun_timeout=None, + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + schedule_interval=timedelta(days=1) +) + + +def print_hello(): + print("Hello World!") + + +secrets = [ + Secret( + deploy_type='env', + deploy_target='CURATION_OPENSEARCH__USER', + secret='opensearch-conn-secrets"', + key='username', + ), + Secret( + deploy_type='env', + deploy_target='CURATION_OPENSEARCH__PASSWORD', + secret='opensearch-conn-secrets"', + key='password', + ), +] + +# Define the KubernetesPodOperator +task = KubernetesPodOperator( + task_id='antispam_checker', + name='antispam_checker', + namespace='kg-airflow', + image='gbloisi/curator:1.0.0', + cmds=['python3'], + arguments=['/antispam-batch.py', + "--opensearch.host", conn.host, + "--opensearch.port", conn.password, + "--openai.host", "local-ai.kg-airflow.dev-1.eosc.intranet", + "--openai.port", "8000", + "--parallelism", "8" + ], + resources={'request_cpu': '100m', 'request_memory': '256Mi'}, + secrets=secrets, + is_delete_operator_pod=True, + in_cluster=True, + get_logs=True, + dag=dag +) + +# Set the task dependencies +task >> print_hello