initial stage

This commit is contained in:
Giambattista Bloisi 2024-03-25 22:09:41 +01:00
parent c9f23d2796
commit 4398546095
1 changed files with 10 additions and 10 deletions

View File

@ -12,7 +12,7 @@ from airflow.decorators import task
from airflow.operators.python import PythonOperator from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.utils.helpers import chain from airflow.utils.helpers import chain
from airflow.models import Variable from airflow.hooks.base import BaseHook
from opensearchpy import OpenSearch, helpers from opensearchpy import OpenSearch, helpers
from EOSC_indexes import mappings from EOSC_indexes import mappings
@ -21,9 +21,6 @@ S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
EOSC_CATALOG_BUCKET = os.getenv("EOSC_CATALOG_BUCKET", "eosc-portal-import") EOSC_CATALOG_BUCKET = os.getenv("EOSC_CATALOG_BUCKET", "eosc-portal-import")
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
OPENSEARCH_HOST = Variable.get("OPENSEARCH_URL", "opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local")
OPENSEARCH_USER = Variable.get("OPENSEARCH_USER", "admin")
OPENSEARCH_PASSWD = Variable.get("OPENSEARCH_PASSWORD", "admin")
ENTITIES = ["interoperability", "services", "training"] ENTITIES = ["interoperability", "services", "training"]
@ -48,9 +45,10 @@ default_args = {
def import_EOSC_catalog(): def import_EOSC_catalog():
@task @task
def create_indexes(): def create_indexes():
conn = BaseHook.get_connection('opensearch_default')
client = OpenSearch( client = OpenSearch(
hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}], hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWD), http_auth=(conn.login, conn.password),
use_ssl=True, use_ssl=True,
verify_certs=False, verify_certs=False,
ssl_show_warn=False, ssl_show_warn=False,
@ -95,9 +93,10 @@ def import_EOSC_catalog():
@task @task
def bulk_load(files: list[(str, str)]): def bulk_load(files: list[(str, str)]):
conn = BaseHook.get_connection('opensearch_default')
client = OpenSearch( client = OpenSearch(
hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}], hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWD), http_auth=(conn.login, conn.password),
use_ssl=True, use_ssl=True,
verify_certs=False, verify_certs=False,
ssl_show_warn=False, ssl_show_warn=False,
@ -135,9 +134,10 @@ def import_EOSC_catalog():
@task @task
def close_indexes(): def close_indexes():
conn = BaseHook.get_connection('opensearch_default')
client = OpenSearch( client = OpenSearch(
hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}], hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWD), http_auth=(conn.login, conn.password),
use_ssl=True, use_ssl=True,
verify_certs=False, verify_certs=False,
ssl_show_warn=False, ssl_show_warn=False,