initial stage
This commit is contained in:
parent
a2e7c4beb6
commit
72ddac35cb
|
@ -0,0 +1,23 @@
|
||||||
|
from airflow.hooks.base import BaseHook
|
||||||
|
from opensearchpy import OpenSearch
|
||||||
|
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
|
||||||
|
|
||||||
|
|
||||||
|
def get_opensearch_client(kwargs) -> OpenSearch:
|
||||||
|
conn = BaseHook.get_connection(kwargs["params"]["OPENSEARCH_CONN_ID"])
|
||||||
|
return OpenSearch(
|
||||||
|
hosts=[{'host': conn.host, 'port': conn.port}],
|
||||||
|
http_auth=(conn.login, conn.password),
|
||||||
|
use_ssl=True,
|
||||||
|
verify_certs=False,
|
||||||
|
ssl_show_warn=False,
|
||||||
|
pool_maxsize=20,
|
||||||
|
timeout=180
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_bucket_name(context: dict, hook: S3Hook, param_name: str):
|
||||||
|
bucket_name = context["params"][param_name]
|
||||||
|
if not bucket_name:
|
||||||
|
bucket_name = hook.extra_args['bucket_name']
|
||||||
|
return bucket_name
|
|
@ -1,21 +1,12 @@
|
||||||
import json
|
import json
|
||||||
from datetime import datetime, timedelta
|
|
||||||
import requests
|
|
||||||
from airflow import DAG
|
|
||||||
from airflow.operators.python_operator import PythonOperator
|
|
||||||
from airflow.providers.http.hooks.http import HttpHook
|
|
||||||
import os
|
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
|
|
||||||
import pendulum
|
import pendulum
|
||||||
from airflow.decorators import dag
|
from airflow.decorators import dag
|
||||||
from airflow.decorators import task
|
from airflow.decorators import task
|
||||||
from airflow.exceptions import AirflowSkipException
|
|
||||||
from airflow.models.param import Param
|
|
||||||
from airflow.operators.python import get_current_context
|
from airflow.operators.python import get_current_context
|
||||||
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
|
|
||||||
|
|
||||||
from dag_utils import get_bucket_name, get_opensearch_client
|
from dag_utils import get_opensearch_client
|
||||||
|
|
||||||
# Define default arguments
|
# Define default arguments
|
||||||
default_args = {
|
default_args = {
|
||||||
|
@ -62,13 +53,13 @@ def remove_old_indexes():
|
||||||
if base_name not in index_dict:
|
if base_name not in index_dict:
|
||||||
index_dict[base_name] = []
|
index_dict[base_name] = []
|
||||||
index_dict[base_name].append((index_name, timestamp))
|
index_dict[base_name].append((index_name, timestamp))
|
||||||
|
|
||||||
for base_name, index_list in index_dict.items():
|
for base_name, index_list in index_dict.items():
|
||||||
index_list.sort(key=lambda x: x[1], reverse=True)
|
index_list.sort(key=lambda x: x[1], reverse=True)
|
||||||
most_recent_index = index_list[0][0]
|
most_recent_index = index_list[0][0]
|
||||||
for index_name, timestamp in index_list:
|
for index_name, timestamp in index_list:
|
||||||
if index_name != most_recent_index and index_name not in alias_index_names:
|
if index_name != most_recent_index and index_name not in alias_index_names:
|
||||||
#hook.run(f'/{index_name}')
|
# hook.run(f'/{index_name}')
|
||||||
print(f'Deleted index: {index_name}')
|
print(f'Deleted index: {index_name}')
|
||||||
|
|
||||||
remove_indexes()
|
remove_indexes()
|
||||||
|
|
Loading…
Reference in New Issue