from __future__ import annotations import gzip import io import json import os import zipfile from datetime import timedelta import pendulum import requests from airflow.decorators import dag from airflow.decorators import task from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.utils.file import TemporaryDirectory S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "skg-if") AWS_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn") EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) default_args = { "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT), "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)), "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), } def strip_prefix(s, p): if s.startswith(p): return s[len(p):] else: return s @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, default_args=default_args, tags=["example", "async", "s3"], ) def skg_if_pipeline(): @task def unzip_to_s3(key: str, bucket: str): hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False}) with TemporaryDirectory() as dwl_dir: with TemporaryDirectory() as tmp_dir: archive = f'{dwl_dir}/{key}' hook.download_file(key=key, bucket_name=bucket, local_path=dwl_dir, preserve_file_name=True, use_autogenerated_subdir=False) with zipfile.ZipFile(archive, 'r') as zip_ref: zip_ref.extractall(tmp_dir) for root, _, files in os.walk(tmp_dir): for file in files: if file == key: continue local_file_path = os.path.join(root, file) hook.load_file(local_file_path, strip_prefix(local_file_path, tmp_dir), S3_BUCKET_NAME, replace=True) return "" @task def bulk_load(entity: str): session = requests.Session() session.auth = ("admin", "admin") session.delete(f'https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200/{entity}_index', verify=False) session.put(f'https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200/{entity}_index', data=json.dumps({ "settings": { "index.number_of_shards": 1, "index.number_of_replicas": 1, "index.refresh_interval": -1 } }), headers={"Content-Type": "application/json"}, verify=False) hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False}) keys = hook.list_keys(bucket_name=S3_BUCKET_NAME, prefix=f'{entity}/') print(keys) for key in keys: print(key) s3_obj = hook.get_key(key, bucket_name=S3_BUCKET_NAME) with gzip.GzipFile(fileobj=s3_obj.get()["Body"]) as gzipfile: buff = io.BufferedReader(gzipfile) for line in buff: data = json.loads(line) session.post( f'https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200/{entity}_index/_doc/' + requests.utils.quote( data['local_identifier'], safe='') + "?refresh=false", data=json.dumps(data), headers={"Content-Type": "application/json"}, verify=False) session.post( f'https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200/{entity}_index/_refresh', verify=False) unzip_to_s3("dump.zip", S3_BUCKET_NAME) bulk_load("datasource") bulk_load("grant") bulk_load("organization") bulk_load("persons") bulk_load("research_product") bulk_load("topic") bulk_load("venues") skg_if_pipeline()