initial stage

This commit is contained in:
Giambattista Bloisi 2024-03-25 17:52:56 +01:00
parent c07ddc03d9
commit 4e1955b673
2 changed files with 89 additions and 0 deletions

43
airflow/dags/S3_unzip.py Normal file
View File

@ -0,0 +1,43 @@
import os
from datetime import timedelta
import pendulum
from airflow.decorators import dag
from airflow.decorators import task
from common import s3_dowload_unzip_upload
S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}
@dag(
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
default_args=default_args,
params={
"zipfile": "File to unzip",
"src_bucket": "bucket containing the zip file",
"dst_bucket": "bucket that will contain unzipped files"
},
tags=["s3"],
)
def s3_unzip():
@task
def unzip(**context):
s3_dowload_unzip_upload(S3_CONN_ID,
context["params"]["zipfile"],
context["params"]["src_bucket"],
context["params"]["dst_bucket"])
unzip()
s3_unzip()

46
airflow/dags/common.py Normal file
View File

@ -0,0 +1,46 @@
import gzip
import io
import json
import os
import zipfile
from datetime import timedelta
from airflow.decorators import dag
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.utils.file import TemporaryDirectory
from airflow.utils.helpers import chain
from airflow.models import Variable
from opensearchpy import OpenSearch, helpers
from EOSC_indexes import mappings
def strip_prefix(s, p):
if s.startswith(p):
return s[len(p):]
else:
return s
def s3_dowload_unzip_upload(s3conn: str, src_key: str, src_bucket: str, dest_bucket: str):
hook = S3Hook(s3conn, transfer_config_args={'use_threads': False})
with TemporaryDirectory() as dwl_dir:
with TemporaryDirectory() as tmp_dir:
archive = f'{dwl_dir}/{src_key}'
hook.download_file(key=src_key, bucket_name=src_bucket, local_path=dwl_dir, preserve_file_name=True,
use_autogenerated_subdir=False)
with zipfile.ZipFile(archive, 'r') as zip_ref:
zip_ref.extractall(tmp_dir)
for root, _, files in os.walk(tmp_dir):
for file in files:
local_file_path = os.path.join(root, file)
hook.load_file(local_file_path, strip_prefix(local_file_path, tmp_dir), dest_bucket,
replace=True)