import os from datetime import timedelta import pendulum from airflow import DAG from airflow.decorators import dag from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn") EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) default_args = { "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT), "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)), "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), } with DAG( dag_id="openaire_to_s3", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule=None, catchup=False, default_args=default_args, params={ "file": "File to download", "dst_bucket": "bucket that will contain file" }, tags=["s3"] ) as dag: HttpToS3Operator( task_id="http_to_s3_task", http_conn_id="openaire_default", endpoint="/data/graph/{{ params.file }}", aws_conn_id=S3_CONN_ID, s3_bucket="{{ params.dst_bucket }}", s3_key="/data/graph/{{ params.file }}", replace=True, )