diff --git a/airflow/dags/remove_old_indexes.py b/airflow/dags/remove_old_indexes.py new file mode 100644 index 0000000..bd1ec68 --- /dev/null +++ b/airflow/dags/remove_old_indexes.py @@ -0,0 +1,77 @@ +import json +from datetime import datetime, timedelta +import requests +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from airflow.providers.http.hooks.http import HttpHook +import os +from datetime import timedelta + +import pendulum +from airflow.decorators import dag +from airflow.decorators import task +from airflow.exceptions import AirflowSkipException +from airflow.models.param import Param +from airflow.operators.python import get_current_context +from airflow.providers.amazon.aws.hooks.s3 import S3Hook + +from dag_utils import get_bucket_name, get_opensearch_client + +# Define default arguments +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + + +@dag( + dag_id="remove_old_indexes", + dag_display_name="Remove outdated MKG indexes", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule=None, + catchup=False, + default_args=default_args, + params={ + "OPENSEARCH_CONN_ID": "opensearch_default", + }, + tags=["opensearch", "maintenance"], +) +def remove_old_indexes(): + @task + def remove_indexes(): + context = get_current_context() + client = get_opensearch_client(context) + indexes = client.cat.indices() + aliases = client.cat.aliases() + + print(json.dumps(aliases)) + print(json.dumps(indexes)) + + alias_index_names = {alias['index'] for alias in aliases} + index_dict = {} + + for index in indexes: + index_name = index['index'] + if '_' in index_name: + base_name = '_'.join(index_name.split('_')[:-1]) + timestamp = index_name.split('_')[-1] + if base_name not in index_dict: + index_dict[base_name] = [] + index_dict[base_name].append((index_name, timestamp)) + + for base_name, index_list in index_dict.items(): + index_list.sort(key=lambda x: x[1], reverse=True) + most_recent_index = index_list[0][0] + for index_name, timestamp in index_list: + if index_name != most_recent_index and index_name not in alias_index_names: + #hook.run(f'/{index_name}') + print(f'Deleted index: {index_name}') + + remove_indexes() + + +remove_old_indexes()