lot1-kickoff/airflow/dags/openaire_to_S3.py

47 lines
1.4 KiB
Python

import os
from datetime import timedelta
import pendulum
import requests
from airflow.decorators import dag
from airflow.decorators import task
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}
@dag(
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
default_args=default_args,
params={
"file": "File to download",
"dst_bucket": "bucket that will contain file"
},
tags=["s3"],
)
def openaire_to_s3():
@task
def download(**context):
conn = BaseHook.get_connection("openaire_default")
session = requests.Session()
session.auth = (conn.login, conn.password)
hook = S3Hook(S3_CONN_ID, transfer_config_args={'use_threads': False})
with session.get("https://" + conn.host + "/data/graph/" + context["params"]["file"], stream=True) as r:
r.raise_for_status()
hook.load_file_obj(r.raw, "/data/graph/" + context["params"]["file"], bucket_name=context["params"]["dst_bucket"], replace=True, encrypt=False)
download()
openaire_to_s3()