118 lines
4.2 KiB
Python
118 lines
4.2 KiB
Python
from __future__ import annotations
|
|
|
|
import gzip
|
|
import io
|
|
import json
|
|
import os
|
|
import zipfile
|
|
from datetime import timedelta
|
|
|
|
import pendulum
|
|
import requests
|
|
from airflow.decorators import dag
|
|
from airflow.decorators import task
|
|
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
|
|
from airflow.utils.file import TemporaryDirectory
|
|
from airflow.utils.helpers import chain
|
|
|
|
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "skg-if")
|
|
|
|
AWS_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
|
|
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
|
|
|
default_args = {
|
|
"execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
|
|
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)),
|
|
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
|
|
}
|
|
|
|
|
|
def strip_prefix(s, p):
|
|
if s.startswith(p):
|
|
return s[len(p):]
|
|
else:
|
|
return s
|
|
|
|
|
|
@dag(
|
|
schedule=None,
|
|
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
|
|
catchup=False,
|
|
default_args=default_args,
|
|
tags=["example", "async", "s3"],
|
|
)
|
|
def skg_if_pipeline():
|
|
@task
|
|
def unzip_to_s3(key: str, bucket: str):
|
|
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
|
|
|
|
with TemporaryDirectory() as dwl_dir:
|
|
with TemporaryDirectory() as tmp_dir:
|
|
archive = f'{dwl_dir}/{key}'
|
|
hook.download_file(key=key, bucket_name=bucket, local_path=dwl_dir, preserve_file_name=True,
|
|
use_autogenerated_subdir=False)
|
|
with zipfile.ZipFile(archive, 'r') as zip_ref:
|
|
zip_ref.extractall(tmp_dir)
|
|
|
|
for root, _, files in os.walk(tmp_dir):
|
|
for file in files:
|
|
if file == key:
|
|
continue
|
|
local_file_path = os.path.join(root, file)
|
|
hook.load_file(local_file_path, strip_prefix(local_file_path, tmp_dir), S3_BUCKET_NAME,
|
|
replace=True)
|
|
return ""
|
|
|
|
@task
|
|
def bulk_load(entity: str):
|
|
session = requests.Session()
|
|
session.auth = ("admin", "admin")
|
|
|
|
session.delete(f'https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200/{entity}_index',
|
|
verify=False)
|
|
|
|
session.put(f'https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200/{entity}_index',
|
|
data=json.dumps({
|
|
"settings": {
|
|
"index.number_of_shards": 1,
|
|
"index.number_of_replicas": 1,
|
|
"index.refresh_interval": -1
|
|
}
|
|
}),
|
|
headers={"Content-Type": "application/json"},
|
|
verify=False)
|
|
|
|
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
|
|
keys = hook.list_keys(bucket_name=S3_BUCKET_NAME, prefix=f'{entity}/')
|
|
print(keys)
|
|
for key in keys:
|
|
print(key)
|
|
s3_obj = hook.get_key(key, bucket_name=S3_BUCKET_NAME)
|
|
with gzip.GzipFile(fileobj=s3_obj.get()["Body"]) as gzipfile:
|
|
buff = io.BufferedReader(gzipfile)
|
|
for line in buff:
|
|
data = json.loads(line)
|
|
session.post(
|
|
f'https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200/{entity}_index/_doc/' + requests.utils.quote(
|
|
data['local_identifier'], safe='') + "?refresh=false",
|
|
data=json.dumps(data),
|
|
headers={"Content-Type": "application/json"},
|
|
verify=False)
|
|
session.post(
|
|
f'https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200/{entity}_index/_refresh',
|
|
verify=False)
|
|
|
|
chain(
|
|
unzip_to_s3.override(task_id=f"unzip_to_s3")("dump.zip", S3_BUCKET_NAME),
|
|
[bulk_load.override(task_id=f"load_datasources")("datasources"),
|
|
bulk_load("grants"),
|
|
bulk_load("organizations"),
|
|
bulk_load("persons"),
|
|
bulk_load("products"),
|
|
bulk_load("topics"),
|
|
bulk_load("venues")
|
|
]
|
|
)
|
|
|
|
|
|
skg_if_pipeline() |