From 6e8f2c366441fccccdc8069b64fc193a1fbedbd7 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Fri, 8 Mar 2024 16:22:00 +0100 Subject: [PATCH] simple test DAG --- airflow/dags/skg_if_pipeline.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/airflow/dags/skg_if_pipeline.py b/airflow/dags/skg_if_pipeline.py index 5505775..6e8a603 100644 --- a/airflow/dags/skg_if_pipeline.py +++ b/airflow/dags/skg_if_pipeline.py @@ -27,6 +27,13 @@ default_args = { } +def strip_prefix(s, p): + if s.startswith(p): + return s[len(p):] + else: + return s + + @task def unzip_to_s3(key: str, bucket: str): hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False}) @@ -43,7 +50,7 @@ def unzip_to_s3(key: str, bucket: str): if file == key: continue local_file_path = os.path.join(root, file) - hook.load_file(local_file_path, local_file_path.removeprefix(tmp_dir), S3_BUCKET_NAME, replace=True) + hook.load_file(local_file_path, strip_prefix(local_file_path, tmp_dir), S3_BUCKET_NAME, replace=True) @task def bulk_load(entity: str):