initial stage

This commit is contained in:
Giambattista Bloisi 2024-06-10 13:58:32 +02:00
parent 2e72b11447
commit 8e7613625e
1 changed files with 4 additions and 2 deletions

View File

@ -35,7 +35,7 @@ def s3_untar():
def untar(**context):
hook = S3Hook(S3_CONN_ID, transfer_config_args={'use_threads': False})
s3_obj = hook.get_key(context["params"]["src_key"], bucket_name=context["params"]["src_bucket"])
with tarfile.open(fileobj=s3_obj.get()["Body"], mode='r:*') as tar:
with tarfile.open(fileobj=s3_obj.get()["Body"], mode='r|*') as tar:
for member in tar:
dst_key = context["params"]["dst_key_prefix"] + "/" + member.name
dst_key = os.path.normpath(dst_key)
@ -44,7 +44,9 @@ def s3_untar():
print(f"Skipping {member.name} to {dst_key}")
continue
print(f"Extracting {member.name} to {dst_key}")
hook.load_file_obj(tar.extractfile(member),
fileobj = tar.extractfile(member)
fileobj.seekable = lambda: False
hook.load_file_obj(fileobj,
dst_key,
context["params"]["dst_bucket"],
replace=True)