From 72ddac35cb6d56a6534dca214e2131e5d9a8f1a0 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Tue, 6 Aug 2024 11:20:52 +0200 Subject: [PATCH] initial stage --- airflow/dags/dag_utils.py | 23 +++++++++++++++++++++++ airflow/dags/remove_old_indexes.py | 15 +++------------ 2 files changed, 26 insertions(+), 12 deletions(-) create mode 100644 airflow/dags/dag_utils.py diff --git a/airflow/dags/dag_utils.py b/airflow/dags/dag_utils.py new file mode 100644 index 0000000..8d1d7d9 --- /dev/null +++ b/airflow/dags/dag_utils.py @@ -0,0 +1,23 @@ +from airflow.hooks.base import BaseHook +from opensearchpy import OpenSearch +from airflow.providers.amazon.aws.hooks.s3 import S3Hook + + +def get_opensearch_client(kwargs) -> OpenSearch: + conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"]) + return OpenSearch( + hosts=[{'host': conn.host, 'port': conn.port}], + http_auth=(conn.login, conn.password), + use_ssl=True, + verify_certs=False, + ssl_show_warn=False, + pool_maxsize=20, + timeout=180 + ) + + +def get_bucket_name(context: dict, hook: S3Hook, param_name: str): + bucket_name = context["params"][param_name] + if not bucket_name: + bucket_name = hook.extra_args['bucket_name'] + return bucket_name diff --git a/airflow/dags/remove_old_indexes.py b/airflow/dags/remove_old_indexes.py index bd1ec68..2d41ada 100644 --- a/airflow/dags/remove_old_indexes.py +++ b/airflow/dags/remove_old_indexes.py @@ -1,21 +1,12 @@ import json -from datetime import datetime, timedelta -import requests -from airflow import DAG -from airflow.operators.python_operator import PythonOperator -from airflow.providers.http.hooks.http import HttpHook -import os from datetime import timedelta import pendulum from airflow.decorators import dag from airflow.decorators import task -from airflow.exceptions import AirflowSkipException -from airflow.models.param import Param from airflow.operators.python import get_current_context -from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from dag_utils import get_bucket_name, get_opensearch_client +from dag_utils import get_opensearch_client # Define default arguments default_args = { @@ -62,13 +53,13 @@ def remove_old_indexes(): if base_name not in index_dict: index_dict[base_name] = [] index_dict[base_name].append((index_name, timestamp)) - + for base_name, index_list in index_dict.items(): index_list.sort(key=lambda x: x[1], reverse=True) most_recent_index = index_list[0][0] for index_name, timestamp in index_list: if index_name != most_recent_index and index_name not in alias_index_names: - #hook.run(f'/{index_name}') + # hook.run(f'/{index_name}') print(f'Deleted index: {index_name}') remove_indexes()