simple test DAG

This commit is contained in:
Giambattista Bloisi 2024-03-06 23:51:53 +01:00
parent 3e6c175901
commit e99002329e
1 changed files with 4 additions and 4 deletions

View File

@ -48,9 +48,6 @@ DAG_ID = "populate_opensearch"
@task @task
def bulk_load(): def bulk_load():
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
keys = hook.list_keys(bucket_name=S3_BUCKET_NAME, prefix='/organization/')
session = requests.Session() session = requests.Session()
session.auth = ("admin", "admin") session.auth = ("admin", "admin")
@ -63,11 +60,14 @@ def bulk_load():
}), }),
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
verify=False) verify=False)
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
keys = hook.list_keys(bucket_name=S3_BUCKET_NAME, prefix='/organization/')
for key in keys: for key in keys:
print(key)
s3_obj = hook.get_key(key, bucket_name=S3_BUCKET_NAME) s3_obj = hook.get_key(key, bucket_name=S3_BUCKET_NAME)
with gzip.GzipFile(fileobj=s3_obj.get()["Body"]) as gzipfile: with gzip.GzipFile(fileobj=s3_obj.get()["Body"]) as gzipfile:
for line in gzipfile: for line in gzipfile:
print(line)
session.post("https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200/organization_index/_doc", session.post("https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200/organization_index/_doc",
data=line, data=line,
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},