import gzip import io import json import os import zipfile from datetime import timedelta from airflow.decorators import dag from airflow.decorators import task from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.utils.file import TemporaryDirectory from airflow.utils.helpers import chain from airflow.models import Variable from opensearchpy import OpenSearch, helpers from EOSC_indexes import mappings def strip_prefix(s, p): if s.startswith(p): return s[len(p):] else: return s def s3_dowload_unzip_upload(s3conn: str, src_key: str, src_bucket: str, dest_bucket: str): hook = S3Hook(s3conn, transfer_config_args={'use_threads': False}) with TemporaryDirectory() as dwl_dir: with TemporaryDirectory() as tmp_dir: archive = f'{dwl_dir}/{src_key}' hook.download_file(key=src_key, bucket_name=src_bucket, local_path=dwl_dir, preserve_file_name=True, use_autogenerated_subdir=False) with zipfile.ZipFile(archive, 'r') as zip_ref: zip_ref.extractall(tmp_dir) for root, _, files in os.walk(tmp_dir): for file in files: local_file_path = os.path.join(root, file) hook.load_file(local_file_path, strip_prefix(local_file_path, tmp_dir + "/"), dest_bucket, replace=True)