simple test DAG

This commit is contained in:
Giambattista Bloisi 2024-03-15 14:14:57 +01:00
parent ab172a39ff
commit 8262871be8
1 changed files with 36 additions and 16 deletions

View File

@ -17,11 +17,13 @@ from airflow.utils.file import TemporaryDirectory
from airflow.utils.helpers import chain from airflow.utils.helpers import chain
from airflow.models import Variable from airflow.models import Variable
from opensearchpy import OpenSearch, helpers
from opensearch_indexes import mappings from opensearch_indexes import mappings
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "skg-if") S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "skg-if")
AWS_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn") AWS_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
OPENSEARCH_HOST= Variable.get("OPENSEARCH_URL", "opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local")
OPENSEARCH_URL= Variable.get("OPENSEARCH_URL", "https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200") OPENSEARCH_URL= Variable.get("OPENSEARCH_URL", "https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200")
OPENSEARCH_USER = Variable.get("OPENSEARCH_USER", "admin") OPENSEARCH_USER = Variable.get("OPENSEARCH_USER", "admin")
OPENSEARCH_PASSWD = Variable.get("OPENSEARCH_PASSWORD", "admin") OPENSEARCH_PASSWD = Variable.get("OPENSEARCH_PASSWORD", "admin")
@ -118,10 +120,15 @@ def skg_if_pipeline():
@task @task
def bulk_load(files: list[(str, str)]): def bulk_load(files: list[(str, str)]):
session = requests.Session() client = OpenSearch(
session.auth = (OPENSEARCH_USER, OPENSEARCH_PASSWD) hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}],
http_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWD),
use_ssl=True,
verify_certs=False
)
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False}) hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
def _generate_data():
for (entity, key) in files: for (entity, key) in files:
print(f'{entity}: {key}') print(f'{entity}: {key}')
s3_obj = hook.get_key(key, bucket_name=S3_BUCKET_NAME) s3_obj = hook.get_key(key, bucket_name=S3_BUCKET_NAME)
@ -129,13 +136,26 @@ def skg_if_pipeline():
buff = io.BufferedReader(gzipfile) buff = io.BufferedReader(gzipfile)
for line in buff: for line in buff:
data = json.loads(line) data = json.loads(line)
response = session.post( data['_index'] = entity
f'{OPENSEARCH_URL}/{entity}_index/_doc/' + requests.utils.quote( data['_id'] = data['local_identifier']
data['local_identifier'], safe='') + "?refresh=false", yield data
data=json.dumps(data),
headers={"Content-Type": "application/json"}, succeeded = []
verify=False) failed = []
response.raise_for_status() for success, item in helpers.parallel_bulk(client, actions=_generate_data()):
if success:
succeeded.append(item)
else:
failed.append(item)
if len(failed) > 0:
print(f"There were {len(failed)} errors:")
for item in failed:
print(item["index"]["error"])
if len(succeeded) > 0:
print(f"Bulk-inserted {len(succeeded)} items (streaming_bulk).")
@task @task