diff --git a/airflow/dags/download_to_S3.py b/airflow/dags/download_to_S3.py new file mode 100644 index 0000000..1645887 --- /dev/null +++ b/airflow/dags/download_to_S3.py @@ -0,0 +1,45 @@ +import os +from datetime import timedelta + +import pendulum +import requests +from airflow.decorators import dag +from airflow.decorators import task +from airflow.providers.amazon.aws.hooks.s3 import S3Hook + +from common import s3_dowload_unzip_upload + +S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn") +EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) + +default_args = { + "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT), + "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)), + "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), +} + + +@dag( + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule=None, + catchup=False, + default_args=default_args, + params={ + "url": "File to download", + "dst_key": "key containing the file", + "dst_bucket": "bucket that will contain file" + }, + tags=["s3"], +) +def download_to_s3(): + @task + def download(**context): + hook = S3Hook(S3_CONN_ID, transfer_config_args={'use_threads': False}) + with requests.get(context["params"]["url"], stream=True) as r: + r.raise_for_status() + hook.load_file_obj(r.raw, context["params"]["dst_key"], bucket_name=context["params"]["dst_bucket"], replace=True, encrypt=False) + + download() + + +download_to_s3()