From 5d073deaa7f0637eb7ca16e502d4df9c057ca837 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Wed, 6 Mar 2024 23:29:08 +0100 Subject: [PATCH] simple test DAG --- airflow/dags/populate_opensearch.py | 85 +++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 airflow/dags/populate_opensearch.py diff --git a/airflow/dags/populate_opensearch.py b/airflow/dags/populate_opensearch.py new file mode 100644 index 0000000..c091e82 --- /dev/null +++ b/airflow/dags/populate_opensearch.py @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Example HTTP operator and sensor""" +from __future__ import annotations + +import gzip +import json +import os +from datetime import datetime + +import requests +from airflow import DAG +from airflow.providers.http.operators.http import HttpOperator +from airflow.providers.http.sensors.http import HttpSensor +from airflow.decorators import task +from airflow.providers.amazon.aws.hooks.s3 import S3Hook + +S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "zenodo-bucket") +S3_BUCKET_KEY = os.getenv("S3_BUCKET_KEY", "test") +S3_BUCKET_KEY_LIST = os.getenv("S3_BUCKET_KEY_LIST", "test2") +S3_BUCKET_WILDCARD_KEY = os.getenv("S3_BUCKET_WILDCARD_KEY", "test*") +PREFIX = os.getenv("S3_PREFIX", "test") +INACTIVITY_PERIOD = float(os.getenv("INACTIVITY_PERIOD", 5)) +AWS_DEFAULT_REGION = os.getenv("AWS_DEFAULT_REGION", "us-east-1") +LOCAL_FILE_PATH = os.getenv("LOCAL_FILE_PATH", "/usr/local/airflow/dags/example_s3_test_file.txt") +AWS_CONN_ID = os.getenv("ASTRO_AWS_S3_CONN_ID", "s3_conn") +EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "populate_opensearch" + + +@task +def bulk_load(): + hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False}) + + keys = hook.list_keys(bucket_name=S3_BUCKET_NAME, prefix='/organization/') + session = requests.Session() + session.auth = ("admin", "admin") + + session.put("https://opensearch-cluster.lot1-opensearch-cluster.svc.local/organization_index", + data=json.dumps({ + "settings": { + "number_of_shards": 1, + "number_of_replicas": 1 + } + }), + headers={"Content-Type": "application/json"}, + verify=False) + + for key in keys: + key_obj = hook.get_key(key, bucket_name=S3_BUCKET_NAME) + + s3_obj = hook.Object(S3_BUCKET_NAME, key) + with gzip.GzipFile(fileobj=s3_obj.get()["Body"]) as gzipfile: + for line in gzipfile: + session.post("https://opensearch-cluster.lot1-opensearch-cluster.svc.local/organization_index/_doc", + data=line, + headers={"Content-Type": "application/json"}, + verify=False) + +with DAG( + DAG_ID, + default_args={"retries": 1}, + tags=["example"], + start_date=datetime(2021, 1, 1), + catchup=False, +) as dag: + bulk_load_data = bulk_load +