diff --git a/airflow/dags/create_bucket.py b/airflow/dags/create_bucket.py new file mode 100644 index 0000000..3643d60 --- /dev/null +++ b/airflow/dags/create_bucket.py @@ -0,0 +1,59 @@ +import os +from datetime import datetime, timedelta +from typing import Any, List + +from airflow.models.dag import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.amazon.aws.operators.s3 import ( + S3CreateBucketOperator, + S3CreateObjectOperator, + S3DeleteBucketOperator, +) +from airflow.utils.state import State +from airflow.utils.trigger_rule import TriggerRule + +S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "test-bucket-astronomer-providers") +S3_BUCKET_KEY = os.getenv("S3_BUCKET_KEY", "test") +S3_BUCKET_KEY_LIST = os.getenv("S3_BUCKET_KEY_LIST", "test2") +S3_BUCKET_WILDCARD_KEY = os.getenv("S3_BUCKET_WILDCARD_KEY", "test*") +PREFIX = os.getenv("S3_PREFIX", "test") +INACTIVITY_PERIOD = float(os.getenv("INACTIVITY_PERIOD", 5)) +AWS_DEFAULT_REGION = os.getenv("AWS_DEFAULT_REGION", "us-east-1") +LOCAL_FILE_PATH = os.getenv("LOCAL_FILE_PATH", "/usr/local/airflow/dags/example_s3_test_file.txt") +AWS_CONN_ID = os.getenv("ASTRO_AWS_S3_CONN_ID", "s3_conn") +EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) +DATA = os.environ.get( + "DATA", + """ +apple,0.5 +milk,2.5 +bread,4.0 +""", +) + +default_args = { + "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT), + "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)), + "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), +} + + +with DAG( + dag_id="example_s3_sensor", + schedule=None, + start_date=datetime(2021, 1, 1), + catchup=False, + default_args=default_args, + tags=["example", "async", "s3"], +) as dag: + create_bucket = S3CreateBucketOperator( + task_id="create_bucket", + region_name=AWS_DEFAULT_REGION, + bucket_name=S3_BUCKET_NAME, + aws_conn_id=AWS_CONN_ID, + ) + + + ( + create_bucket + ) \ No newline at end of file