initial stage

This commit is contained in:
Giambattista Bloisi 2024-03-25 21:19:50 +01:00
parent 00514edfbd
commit b86cf359f5
2 changed files with 9 additions and 17 deletions

View File

@ -22,7 +22,6 @@ EOSC_CATALOG_BUCKET = os.getenv("EOSC_CATALOG_BUCKET", "eosc-catalog")
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
OPENSEARCH_HOST = Variable.get("OPENSEARCH_URL", "opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local") OPENSEARCH_HOST = Variable.get("OPENSEARCH_URL", "opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local")
OPENSEARCH_URL = Variable.get("OPENSEARCH_URL", "https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200")
OPENSEARCH_USER = Variable.get("OPENSEARCH_USER", "admin") OPENSEARCH_USER = Variable.get("OPENSEARCH_USER", "admin")
OPENSEARCH_PASSWD = Variable.get("OPENSEARCH_PASSWORD", "admin") OPENSEARCH_PASSWD = Variable.get("OPENSEARCH_PASSWORD", "admin")

View File

@ -14,7 +14,7 @@ from airflow.decorators import task
from airflow.operators.python import PythonOperator from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.utils.helpers import chain from airflow.utils.helpers import chain
from airflow.models import Variable from airflow.hooks.base import BaseHook
from opensearchpy import OpenSearch, helpers from opensearchpy import OpenSearch, helpers
from EOSC_indexes import mappings from EOSC_indexes import mappings
@ -23,40 +23,32 @@ from EOSC_entity_trasform import transform_entities
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "skgif-eosc-eu") S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "skgif-eosc-eu")
S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn") S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
OPENSEARCH_HOST = Variable.get("OPENSEARCH_URL", "opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local")
OPENSEARCH_URL = Variable.get("OPENSEARCH_URL",
"https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200")
OPENSEARCH_USER = Variable.get("OPENSEARCH_USER", "admin")
OPENSEARCH_PASSWD = Variable.get("OPENSEARCH_PASSWORD", "admin")
ENTITIES = ["datasource", "grants", "organizations", "persons", "products", "topics", "venues"] ENTITIES = ["datasource", "grants", "organizations", "persons", "products", "topics", "venues"]
BULK_PARALLELISM = 10 BULK_PARALLELISM = 10
#
default_args = { default_args = {
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT), "execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)), "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
} }
@dag( @dag(
schedule=None, schedule=None,
dagrun_timeout=None, dagrun_timeout=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False, catchup=False,
default_args=default_args, default_args=default_args,
tags=["lot1"], tags=["lot1"]
) )
def import_EOSC_graph(): def import_EOSC_graph():
@task @task
def create_indexes(): def create_indexes():
conn = BaseHook.get_connection('opensearch_default')
client = OpenSearch( client = OpenSearch(
hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}], hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWD), http_auth=(conn.login, conn.password),
use_ssl=True, use_ssl=True,
verify_certs=False, verify_certs=False,
ssl_show_warn=False, ssl_show_warn=False,
@ -127,9 +119,9 @@ def import_EOSC_graph():
) )
}) })
def bulk_load(files: list[(str, str)]): def bulk_load(files: list[(str, str)]):
conn = BaseHook.get_connection('opensearch_default')
client = OpenSearch( client = OpenSearch(
hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}], hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWD),
use_ssl=True, use_ssl=True,
verify_certs=False, verify_certs=False,
ssl_show_warn=False, ssl_show_warn=False,
@ -186,8 +178,9 @@ def import_EOSC_graph():
@task @task
def close_indexes(): def close_indexes():
conn = BaseHook.get_connection('opensearch_default')
client = OpenSearch( client = OpenSearch(
hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}], hosts=[{'host': conn.host, 'port': conn.port}],
http_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWD), http_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWD),
use_ssl=True, use_ssl=True,
verify_certs=False, verify_certs=False,