simple test DAG

This commit is contained in:
Giambattista Bloisi 2024-03-08 16:01:11 +01:00
parent 1ad289e948
commit 7edb0c5a7e
1 changed files with 12 additions and 8 deletions

View File

@ -1,3 +1,5 @@
from __future__ import annotations
import gzip
import io
import json
@ -11,6 +13,7 @@ from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.utils.file import TemporaryDirectory
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "skg-if")
@ -28,15 +31,16 @@ default_args = {
def unzip_to_s3(key: str, bucket: str):
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
tarball_obj = hook.get_key(key, bucket_name=bucket)
with zipfile.ZipFile(file=tarball_obj.get()['Body']) as z:
for member in z.filelist:
if member.is_dir:
continue
with z.open(member.filename) as fd:
hook.load_file_obj(BytesIO(fd.read()), member.filename, S3_BUCKET_NAME)
with TemporaryDirectory() as tmp_dir:
archive = f'{tmp_dir}/{key}'
hook.download_file(key, bucket_name=bucket, local_path=archive)
with zipfile.ZipFile(archive, 'r') as zip_ref:
zip_ref.extractall(tmp_dir)
for root, _, files in os.walk(tmp_dir):
for file in files:
local_file_path = os.path.join(root, file)
hook.load_file(local_file_path, local_file_path.lstrip(tmp_dir), S3_BUCKET_NAME)
@task
def bulk_load(entity: str):