diff --git a/airflow/dags/common.py b/airflow/dags/common.py index cd12b0a..a4af8a4 100644 --- a/airflow/dags/common.py +++ b/airflow/dags/common.py @@ -1,21 +1,8 @@ - -import gzip -import io -import json import os import zipfile -from datetime import timedelta -from airflow.decorators import dag -from airflow.decorators import task -from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.utils.file import TemporaryDirectory -from airflow.utils.helpers import chain -from airflow.models import Variable - -from opensearchpy import OpenSearch, helpers -from EOSC_indexes import mappings def strip_prefix(s, p): @@ -34,13 +21,8 @@ def s3_dowload_unzip_upload(s3conn: str, src_key: str, src_bucket: str, dest_buc hook.download_file(key=src_key, bucket_name=src_bucket, local_path=dwl_dir, preserve_file_name=True, use_autogenerated_subdir=False) with zipfile.ZipFile(archive, 'r') as zip_ref: - zip_ref.extractall(tmp_dir) - - for root, _, files in os.walk(tmp_dir): - for file in files: - local_file_path = os.path.join(root, file) - hook.load_file(local_file_path, strip_prefix(local_file_path, tmp_dir + "/"), dest_bucket, - replace=True) - + for info in zip_ref.infolist(): + with zip_ref.open(info.filename) as file: + hook.load_file_obj(file, info.filename, dest_bucket, replace=True)