initial stage

This commit is contained in:
Giambattista Bloisi 2024-08-06 11:19:01 +02:00
parent 118e29f462
commit a2e7c4beb6
1 changed files with 77 additions and 0 deletions

View File

@ -0,0 +1,77 @@
import json
from datetime import datetime, timedelta
import requests
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.http.hooks.http import HttpHook
import os
from datetime import timedelta
import pendulum
from airflow.decorators import dag
from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
from airflow.models.param import Param
from airflow.operators.python import get_current_context
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from dag_utils import get_bucket_name, get_opensearch_client
# Define default arguments
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
@dag(
dag_id="remove_old_indexes",
dag_display_name="Remove outdated MKG indexes",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
default_args=default_args,
params={
"OPENSEARCH_CONN_ID": "opensearch_default",
},
tags=["opensearch", "maintenance"],
)
def remove_old_indexes():
@task
def remove_indexes():
context = get_current_context()
client = get_opensearch_client(context)
indexes = client.cat.indices()
aliases = client.cat.aliases()
print(json.dumps(aliases))
print(json.dumps(indexes))
alias_index_names = {alias['index'] for alias in aliases}
index_dict = {}
for index in indexes:
index_name = index['index']
if '_' in index_name:
base_name = '_'.join(index_name.split('_')[:-1])
timestamp = index_name.split('_')[-1]
if base_name not in index_dict:
index_dict[base_name] = []
index_dict[base_name].append((index_name, timestamp))
for base_name, index_list in index_dict.items():
index_list.sort(key=lambda x: x[1], reverse=True)
most_recent_index = index_list[0][0]
for index_name, timestamp in index_list:
if index_name != most_recent_index and index_name not in alias_index_names:
#hook.run(f'/{index_name}')
print(f'Deleted index: {index_name}')
remove_indexes()
remove_old_indexes()