From 1ad289e9480ecd038128b092e767394a74740cac Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Fri, 8 Mar 2024 15:35:45 +0100 Subject: [PATCH] simple test DAG --- airflow/dags/skg_if_pipeline.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow/dags/skg_if_pipeline.py b/airflow/dags/skg_if_pipeline.py index 389e604..9ac0d67 100644 --- a/airflow/dags/skg_if_pipeline.py +++ b/airflow/dags/skg_if_pipeline.py @@ -17,13 +17,13 @@ S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "skg-if") AWS_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn") EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) - default_args = { "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT), "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)), "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), } + @task def unzip_to_s3(key: str, bucket: str): hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False}) @@ -37,6 +37,7 @@ def unzip_to_s3(key: str, bucket: str): with z.open(member.filename) as fd: hook.load_file_obj(BytesIO(fd.read()), member.filename, S3_BUCKET_NAME) + @task def bulk_load(entity: str): session = requests.Session() @@ -73,6 +74,7 @@ def bulk_load(entity: str): session.post(f'https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200/{entity}_index/_refresh', verify=False) + with DAG( dag_id="skg_if_pipeline", schedule=None,