diff --git a/airflow/dags/S3_untar.py b/airflow/dags/S3_untar.py new file mode 100644 index 0000000..d8c3d88 --- /dev/null +++ b/airflow/dags/S3_untar.py @@ -0,0 +1,50 @@ +import os +import tarfile +from datetime import timedelta + +import pendulum +from airflow.decorators import dag +from airflow.decorators import task +from airflow.providers.amazon.aws.hooks.s3 import S3Hook + +from common import s3_dowload_unzip_upload + +S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn") +EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) + +default_args = { + "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT), + "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)), + "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), +} + + +@dag( + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule=None, + catchup=False, + default_args=default_args, + params={ + "src_key": "File to untar", + "src_bucket": "bucket containing the zip file", + "dst_key_prefix": "", + "dst_bucket": "bucket that will contain unzipped files" + }, + tags=["s3"], +) +def s3_untar(): + @task + def untar(**context): + hook = S3Hook(S3_CONN_ID, transfer_config_args={'use_threads': False}) + s3_obj = hook.get_key(context["params"]["src_key"], bucket_name=context["params"]["src_bucket"]) + with tarfile.TarFile(fileobj=s3_obj.get()["Body"], mode='r') as tar: + for member in tar: + hook.load_file_obj(tar.extractfile(member.name), + context["params"]["dst_key_prefix"] + member.name, + context["params"]["dst_bucket"], + replace=True) + + untar() + + +s3_untar()