From f1cec0cfebdad20e0dad4c9a306384f80bcc4d7d Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Fri, 15 Mar 2024 12:44:19 +0100 Subject: [PATCH] simple test DAG --- airflow/dags/skg_if_pipeline.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/airflow/dags/skg_if_pipeline.py b/airflow/dags/skg_if_pipeline.py index c4add6c..325a61d 100644 --- a/airflow/dags/skg_if_pipeline.py +++ b/airflow/dags/skg_if_pipeline.py @@ -46,11 +46,6 @@ def strip_prefix(s, p): return s -def split_list(list_a, chunk_size): - for i in range(0, len(list_a), chunk_size): - yield list_a[i:i + chunk_size] - - @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), @@ -113,9 +108,13 @@ def skg_if_pipeline(): hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False}) keys = hook.list_keys(bucket_name=S3_BUCKET_NAME, prefix=f'{entity}/') for key in keys: - pieces.append((entity, key)) + pieces.append({entity, key}) - return list(split_list(pieces, BULK_PARALLELISM)) + def split_list(list_a, chunk_size): + for i in range(0, len(list_a), chunk_size): + yield {"files": list_a[i:i + chunk_size]} + + return list(split_list(pieces, len(pieces)/BULK_PARALLELISM)) @task def bulk_load(files: list[(str, str)]):