diff --git a/airflow/dags/skg_if_pipeline.py b/airflow/dags/skg_if_pipeline.py index 9ac0d67..3ad3939 100644 --- a/airflow/dags/skg_if_pipeline.py +++ b/airflow/dags/skg_if_pipeline.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import gzip import io import json @@ -11,6 +13,7 @@ from airflow.decorators import task from airflow.models.baseoperator import chain from airflow.models.dag import DAG from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.utils.file import TemporaryDirectory S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "skg-if") @@ -28,15 +31,16 @@ default_args = { def unzip_to_s3(key: str, bucket: str): hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False}) - tarball_obj = hook.get_key(key, bucket_name=bucket) - - with zipfile.ZipFile(file=tarball_obj.get()['Body']) as z: - for member in z.filelist: - if member.is_dir: - continue - with z.open(member.filename) as fd: - hook.load_file_obj(BytesIO(fd.read()), member.filename, S3_BUCKET_NAME) + with TemporaryDirectory() as tmp_dir: + archive = f'{tmp_dir}/{key}' + hook.download_file(key, bucket_name=bucket, local_path=archive) + with zipfile.ZipFile(archive, 'r') as zip_ref: + zip_ref.extractall(tmp_dir) + for root, _, files in os.walk(tmp_dir): + for file in files: + local_file_path = os.path.join(root, file) + hook.load_file(local_file_path, local_file_path.lstrip(tmp_dir), S3_BUCKET_NAME) @task def bulk_load(entity: str):