From 8e7613625eb2bf15be4f29527accf1d533d2229d Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Mon, 10 Jun 2024 13:58:32 +0200 Subject: [PATCH] initial stage --- airflow/dags/S3_untar.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airflow/dags/S3_untar.py b/airflow/dags/S3_untar.py index 1d82477..7a4d65d 100644 --- a/airflow/dags/S3_untar.py +++ b/airflow/dags/S3_untar.py @@ -35,7 +35,7 @@ def s3_untar(): def untar(**context): hook = S3Hook(S3_CONN_ID, transfer_config_args={'use_threads': False}) s3_obj = hook.get_key(context["params"]["src_key"], bucket_name=context["params"]["src_bucket"]) - with tarfile.open(fileobj=s3_obj.get()["Body"], mode='r:*') as tar: + with tarfile.open(fileobj=s3_obj.get()["Body"], mode='r|*') as tar: for member in tar: dst_key = context["params"]["dst_key_prefix"] + "/" + member.name dst_key = os.path.normpath(dst_key) @@ -44,7 +44,9 @@ def s3_untar(): print(f"Skipping {member.name} to {dst_key}") continue print(f"Extracting {member.name} to {dst_key}") - hook.load_file_obj(tar.extractfile(member), + fileobj = tar.extractfile(member) + fileobj.seekable = lambda: False + hook.load_file_obj(fileobj, dst_key, context["params"]["dst_bucket"], replace=True)