From 031b11a3dbf26ed8811771f118199631c03ba0cb Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Fri, 8 Mar 2024 17:20:37 +0100 Subject: [PATCH] simple test DAG --- airflow/dags/skg_if_pipeline.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/airflow/dags/skg_if_pipeline.py b/airflow/dags/skg_if_pipeline.py index 85acc59..cf62e4d 100644 --- a/airflow/dags/skg_if_pipeline.py +++ b/airflow/dags/skg_if_pipeline.py @@ -34,7 +34,6 @@ def strip_prefix(s, p): return s -@task def unzip_to_s3(key: str, bucket: str): hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False}) @@ -54,7 +53,6 @@ def unzip_to_s3(key: str, bucket: str): return "" -@task def bulk_load(entity: str): session = requests.Session() session.auth = ("admin", "admin") @@ -91,6 +89,11 @@ def bulk_load(entity: str): verify=False) return "" +@task +def do_all(entity: str): + unzip_to_s3("dump.zip", S3_BUCKET_NAME) + bulk_load("research_product") + with DAG( dag_id="skg_if_pipeline", @@ -100,12 +103,9 @@ with DAG( default_args=default_args, tags=["example", "async", "s3"], ) as dag: - unzip_to_s3 = unzip_to_s3("dump.zip", S3_BUCKET_NAME) - bulk_load_rp = bulk_load("research_product") + all_done = do_all("") - chain(unzip_to_s3, - bulk_load_rp - ) + chain(all_done)