simple test DAG
This commit is contained in:
parent
2937d77cba
commit
5d073deaa7
|
@ -0,0 +1,85 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Example HTTP operator and sensor"""
|
||||
from __future__ import annotations
|
||||
|
||||
import gzip
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
import requests
|
||||
from airflow import DAG
|
||||
from airflow.providers.http.operators.http import HttpOperator
|
||||
from airflow.providers.http.sensors.http import HttpSensor
|
||||
from airflow.decorators import task
|
||||
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
|
||||
|
||||
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "zenodo-bucket")
|
||||
S3_BUCKET_KEY = os.getenv("S3_BUCKET_KEY", "test")
|
||||
S3_BUCKET_KEY_LIST = os.getenv("S3_BUCKET_KEY_LIST", "test2")
|
||||
S3_BUCKET_WILDCARD_KEY = os.getenv("S3_BUCKET_WILDCARD_KEY", "test*")
|
||||
PREFIX = os.getenv("S3_PREFIX", "test")
|
||||
INACTIVITY_PERIOD = float(os.getenv("INACTIVITY_PERIOD", 5))
|
||||
AWS_DEFAULT_REGION = os.getenv("AWS_DEFAULT_REGION", "us-east-1")
|
||||
LOCAL_FILE_PATH = os.getenv("LOCAL_FILE_PATH", "/usr/local/airflow/dags/example_s3_test_file.txt")
|
||||
AWS_CONN_ID = os.getenv("ASTRO_AWS_S3_CONN_ID", "s3_conn")
|
||||
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
||||
|
||||
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
|
||||
DAG_ID = "populate_opensearch"
|
||||
|
||||
|
||||
@task
|
||||
def bulk_load():
|
||||
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
|
||||
|
||||
keys = hook.list_keys(bucket_name=S3_BUCKET_NAME, prefix='/organization/')
|
||||
session = requests.Session()
|
||||
session.auth = ("admin", "admin")
|
||||
|
||||
session.put("https://opensearch-cluster.lot1-opensearch-cluster.svc.local/organization_index",
|
||||
data=json.dumps({
|
||||
"settings": {
|
||||
"number_of_shards": 1,
|
||||
"number_of_replicas": 1
|
||||
}
|
||||
}),
|
||||
headers={"Content-Type": "application/json"},
|
||||
verify=False)
|
||||
|
||||
for key in keys:
|
||||
key_obj = hook.get_key(key, bucket_name=S3_BUCKET_NAME)
|
||||
|
||||
s3_obj = hook.Object(S3_BUCKET_NAME, key)
|
||||
with gzip.GzipFile(fileobj=s3_obj.get()["Body"]) as gzipfile:
|
||||
for line in gzipfile:
|
||||
session.post("https://opensearch-cluster.lot1-opensearch-cluster.svc.local/organization_index/_doc",
|
||||
data=line,
|
||||
headers={"Content-Type": "application/json"},
|
||||
verify=False)
|
||||
|
||||
with DAG(
|
||||
DAG_ID,
|
||||
default_args={"retries": 1},
|
||||
tags=["example"],
|
||||
start_date=datetime(2021, 1, 1),
|
||||
catchup=False,
|
||||
) as dag:
|
||||
bulk_load_data = bulk_load
|
||||
|
Loading…
Reference in New Issue