lot1-kickoff/airflow/dags/skg_if_pipeline.py

134 lines
4.9 KiB
Python

from __future__ import annotations
import gzip
import io
import json
import os
import zipfile
from datetime import timedelta
import pendulum
import requests
from airflow.decorators import dag
from airflow.decorators import task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.utils.file import TemporaryDirectory
from airflow.utils.helpers import chain
from opensearch_indexes import mappings
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "skg-if")
AWS_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
OPENSEARCH_URL= os.getenv("OPENSEARCH_URL", "https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200")
OPENSEARCH_USER = os.getenv("OPENSEARCH_USER", "admin")
OPENSEARCH_PASSWD = os.getenv("OPENSEARCH_PASSWORD", "admin")
#
default_args = {
"execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}
def strip_prefix(s, p):
if s.startswith(p):
return s[len(p):]
else:
return s
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
default_args=default_args,
tags=["example", "async", "s3"],
)
def skg_if_pipeline():
@task
def unzip_to_s3(key: str, bucket: str):
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
with TemporaryDirectory() as dwl_dir:
with TemporaryDirectory() as tmp_dir:
archive = f'{dwl_dir}/{key}'
hook.download_file(key=key, bucket_name=bucket, local_path=dwl_dir, preserve_file_name=True,
use_autogenerated_subdir=False)
with zipfile.ZipFile(archive, 'r') as zip_ref:
zip_ref.extractall(tmp_dir)
for root, _, files in os.walk(tmp_dir):
for file in files:
if file == key:
continue
local_file_path = os.path.join(root, file)
hook.load_file(local_file_path, strip_prefix(local_file_path, tmp_dir), S3_BUCKET_NAME,
replace=True)
return ""
@task
def bulk_load(entity: str):
session = requests.Session()
session.auth = (OPENSEARCH_USER, OPENSEARCH_PASSWD)
response = session.delete(f'{OPENSEARCH_URL}/{entity}_index',
verify=False)
response.raise_for_status()
response = session.put(f'{OPENSEARCH_URL}/{entity}_index',
data=json.dumps({
"settings": {
"index": {
"number_of_shards": 3,
"number_of_replicas": 1,
"refresh_interval": -1,
"codec": "zstd_no_dict",
"replication.type": "SEGMENT"
}
},
"mappings": mappings[entity]
}),
headers={"Content-Type": "application/json"},
verify=False)
response.raise_for_status()
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
keys = hook.list_keys(bucket_name=S3_BUCKET_NAME, prefix=f'{entity}/')
for key in keys:
print(key)
s3_obj = hook.get_key(key, bucket_name=S3_BUCKET_NAME)
with gzip.GzipFile(fileobj=s3_obj.get()["Body"]) as gzipfile:
buff = io.BufferedReader(gzipfile)
for line in buff:
data = json.loads(line)
response = session.post(
f'{OPENSEARCH_URL}/{entity}_index/_doc/' + requests.utils.quote(
data['local_identifier'], safe='') + "?refresh=false",
data=json.dumps(data),
headers={"Content-Type": "application/json"},
verify=False)
response.raise_for_status()
response = session.post(
f'{OPENSEARCH_URL}/{entity}_index/_refresh',
verify=False)
response.raise_for_status()
chain(
unzip_to_s3.override(task_id=f"unzip_to_s3")("dump.zip", S3_BUCKET_NAME),
[bulk_load.override(task_id=f"load_datasources")("datasources"),
bulk_load.override(task_id=f"load_grants")("grants"),
bulk_load.override(task_id=f"load_organizations")("organizations"),
bulk_load.override(task_id=f"load_persons")("persons"),
bulk_load.override(task_id=f"load_products")("products"),
bulk_load.override(task_id=f"load_topics")("topics"),
bulk_load.override(task_id=f"load_values")("venues")
]
)
skg_if_pipeline()