From b3a9ad8342fd25b94153e278ad98ee88c076cc02 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Fri, 7 Jun 2024 18:10:48 +0200 Subject: [PATCH] initial stage --- airflow/dags/openaire_to_S3.py | 45 ++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 airflow/dags/openaire_to_S3.py diff --git a/airflow/dags/openaire_to_S3.py b/airflow/dags/openaire_to_S3.py new file mode 100644 index 0000000..3db9d59 --- /dev/null +++ b/airflow/dags/openaire_to_S3.py @@ -0,0 +1,45 @@ +import os +from datetime import timedelta + +import pendulum +import requests +from airflow.decorators import dag +from airflow.decorators import task +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator + +from common import s3_dowload_unzip_upload + +S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn") +EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) + +default_args = { + "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT), + "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)), + "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), +} + + +@dag( + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule=None, + catchup=False, + default_args=default_args, + params={ + "file": "File to download", + "dst_bucket": "bucket that will contain file" + }, + tags=["s3"], +) +def openaire_to_s3(**context): + return HttpToS3Operator( + task_id="http_to_s3_task", + http_conn_id="openaire_conn", + endpoint="/data/graph/" + context["params"]["file"], + s3_conn_id=S3_CONN_ID, + s3_bucket=context["params"]["dst_bucket"], + s3_key="/data/graph/" + context["params"]["file"], + replace=True, + ) + +openaire_to_s3()