simple test DAG

This commit is contained in:
Giambattista Bloisi 2024-03-08 16:22:00 +01:00
parent d281fb043a
commit 6e8f2c3664
1 changed files with 8 additions and 1 deletions

View File

@ -27,6 +27,13 @@ default_args = {
}
def strip_prefix(s, p):
if s.startswith(p):
return s[len(p):]
else:
return s
@task
def unzip_to_s3(key: str, bucket: str):
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
@ -43,7 +50,7 @@ def unzip_to_s3(key: str, bucket: str):
if file == key:
continue
local_file_path = os.path.join(root, file)
hook.load_file(local_file_path, local_file_path.removeprefix(tmp_dir), S3_BUCKET_NAME, replace=True)
hook.load_file(local_file_path, strip_prefix(local_file_path, tmp_dir), S3_BUCKET_NAME, replace=True)
@task
def bulk_load(entity: str):