import os from datetime import datetime, timedelta from airflow import settings from airflow.decorators import task from airflow.models.baseoperator import chain from airflow.models.connection import Connection from airflow.models.dag import DAG from airflow.providers.amazon.aws.operators.s3 import ( S3CreateBucketOperator, ) from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "zenodo-bucket") S3_BUCKET_KEY = os.getenv("S3_BUCKET_KEY", "test") S3_BUCKET_KEY_LIST = os.getenv("S3_BUCKET_KEY_LIST", "test2") S3_BUCKET_WILDCARD_KEY = os.getenv("S3_BUCKET_WILDCARD_KEY", "test*") PREFIX = os.getenv("S3_PREFIX", "test") INACTIVITY_PERIOD = float(os.getenv("INACTIVITY_PERIOD", 5)) AWS_DEFAULT_REGION = os.getenv("AWS_DEFAULT_REGION", "us-east-1") LOCAL_FILE_PATH = os.getenv("LOCAL_FILE_PATH", "/usr/local/airflow/dags/example_s3_test_file.txt") AWS_CONN_ID = os.getenv("ASTRO_AWS_S3_CONN_ID", "s3_conn") EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) DATA = os.environ.get( "DATA", """ apple,0.5 milk,2.5 bread,4.0 """, ) default_args = { "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT), "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)), "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), } @task def create_connection(conn_id_name: str): conn = Connection( conn_id=conn_id_name, conn_type="https", host="zenodo.org", port=80, ) session = settings.Session() session.add(conn) session.commit() with DAG( dag_id="zenodo_download_to_s3", schedule=None, start_date=datetime(2021, 1, 1), catchup=False, default_args=default_args, tags=["example", "async", "s3"], ) as dag: conn_id_name = "zenodo" set_up_connection = create_connection(conn_id_name) create_bucket = S3CreateBucketOperator( task_id="create_bucket", region_name=AWS_DEFAULT_REGION, bucket_name=S3_BUCKET_NAME, aws_conn_id=AWS_CONN_ID, ) http_to_s3_task = HttpToS3Operator( task_id="http_to_s3_task", http_conn_id=conn_id_name, endpoint="/records/8223812/files/organization.tar", s3_bucket=S3_BUCKET_NAME, s3_key="organization.tar", replace=True, aws_conn_id=AWS_CONN_ID, ) chain(set_up_connection, create_bucket, http_to_s3_task)