initial stage
This commit is contained in:
parent
0738f8bebc
commit
2f5430d9c8
|
@ -0,0 +1,243 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import gzip
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import zipfile
|
||||
from datetime import timedelta
|
||||
|
||||
from kubernetes.client import models as k8s
|
||||
import pendulum
|
||||
from airflow.decorators import dag
|
||||
from airflow.decorators import task
|
||||
from airflow.operators.python import PythonOperator
|
||||
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
|
||||
from airflow.utils.file import TemporaryDirectory
|
||||
from airflow.utils.helpers import chain
|
||||
from airflow.models import Variable
|
||||
|
||||
from opensearchpy import OpenSearch, helpers
|
||||
from opensearch_indexes import mappings
|
||||
|
||||
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "skgif-eosc-eu")
|
||||
AWS_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
|
||||
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
||||
OPENSEARCH_HOST= Variable.get("OPENSEARCH_URL", "opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local")
|
||||
OPENSEARCH_URL= Variable.get("OPENSEARCH_URL", "https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200")
|
||||
OPENSEARCH_USER = Variable.get("OPENSEARCH_USER", "admin")
|
||||
OPENSEARCH_PASSWD = Variable.get("OPENSEARCH_PASSWORD", "admin")
|
||||
|
||||
ENTITIES = ["datasources", "grants", "organizations", "persons", "products", "topics", "venues"]
|
||||
|
||||
BULK_PARALLELISM = 10
|
||||
|
||||
#
|
||||
|
||||
default_args = {
|
||||
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
|
||||
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
|
||||
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
|
||||
}
|
||||
|
||||
|
||||
def strip_prefix(s, p):
|
||||
if s.startswith(p):
|
||||
return s[len(p):]
|
||||
else:
|
||||
return s
|
||||
|
||||
|
||||
@dag(
|
||||
schedule=None,
|
||||
dagrun_timeout=None,
|
||||
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
|
||||
catchup=False,
|
||||
default_args=default_args,
|
||||
tags=["example", "async", "s3"],
|
||||
)
|
||||
def import_EOSC_graph():
|
||||
@task
|
||||
def unzip_to_s3(key: str, bucket: str):
|
||||
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
|
||||
|
||||
with TemporaryDirectory() as dwl_dir:
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
archive = f'{dwl_dir}/{key}'
|
||||
hook.download_file(key=key, bucket_name=bucket, local_path=dwl_dir, preserve_file_name=True,
|
||||
use_autogenerated_subdir=False)
|
||||
with zipfile.ZipFile(archive, 'r') as zip_ref:
|
||||
zip_ref.extractall(tmp_dir)
|
||||
|
||||
for root, _, files in os.walk(tmp_dir):
|
||||
for file in files:
|
||||
if file == key:
|
||||
continue
|
||||
local_file_path = os.path.join(root, file)
|
||||
hook.load_file(local_file_path, strip_prefix(local_file_path, tmp_dir), S3_BUCKET_NAME,
|
||||
replace=True)
|
||||
return ""
|
||||
|
||||
@task
|
||||
def create_indexes():
|
||||
client = OpenSearch(
|
||||
hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}],
|
||||
http_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWD),
|
||||
use_ssl=True,
|
||||
verify_certs=False,
|
||||
ssl_show_warn=False,
|
||||
pool_maxsize=20
|
||||
)
|
||||
|
||||
client.cluster.put_settings(body={
|
||||
"persistent": {
|
||||
"cluster.routing.allocation.balance.prefer_primary": True,
|
||||
"segrep.pressure.enabled": True
|
||||
}
|
||||
})
|
||||
|
||||
for entity in ENTITIES:
|
||||
if client.indices.exists(entity):
|
||||
client.indices.delete(entity)
|
||||
|
||||
client.indices.create(entity, {
|
||||
"settings": {
|
||||
"index": {
|
||||
"number_of_shards": 40,
|
||||
"number_of_replicas": 0,
|
||||
"refresh_interval": -1,
|
||||
|
||||
"translog.flush_threshold_size": "2048MB",
|
||||
|
||||
"codec": "zstd_no_dict",
|
||||
"replication.type": "SEGMENT"
|
||||
}
|
||||
|
||||
},
|
||||
"mappings": mappings[entity]
|
||||
# "mappings":{
|
||||
# "dynamic": False,
|
||||
# "properties": {
|
||||
# "local_identifier": {
|
||||
# "type": "keyword"
|
||||
# }
|
||||
# }
|
||||
# }
|
||||
})
|
||||
|
||||
def compute_batches(ds=None, **kwargs):
|
||||
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
|
||||
pieces = []
|
||||
for entity in ENTITIES:
|
||||
keys = hook.list_keys(bucket_name=S3_BUCKET_NAME, prefix=f'{entity}/')
|
||||
to_delete = list(filter(lambda key: key.endswith('.PROCESSED'), keys))
|
||||
hook.delete_objects(bucket=S3_BUCKET_NAME,keys=to_delete)
|
||||
for key in keys:
|
||||
if key.endswith('.gz'):
|
||||
pieces.append((entity, key))
|
||||
|
||||
def split_list(list_a, chunk_size):
|
||||
for i in range(0, len(list_a), chunk_size):
|
||||
yield {"files": list_a[i:i + chunk_size]}
|
||||
|
||||
return list(split_list(pieces, len(pieces)//BULK_PARALLELISM))
|
||||
|
||||
@task(executor_config={
|
||||
"pod_override": k8s.V1Pod(
|
||||
spec=k8s.V1PodSpec(
|
||||
containers=[
|
||||
k8s.V1Container(
|
||||
name="base",
|
||||
resources=k8s.V1ResourceRequirements(
|
||||
requests={
|
||||
"cpu": "550m",
|
||||
"memory": "256Mi"
|
||||
}
|
||||
)
|
||||
)
|
||||
]
|
||||
)
|
||||
)
|
||||
})
|
||||
def bulk_load(files: list[(str, str)]):
|
||||
client = OpenSearch(
|
||||
hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}],
|
||||
http_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWD),
|
||||
use_ssl=True,
|
||||
verify_certs=False,
|
||||
ssl_show_warn=False,
|
||||
pool_maxsize=20
|
||||
)
|
||||
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
|
||||
|
||||
for (entity, key) in files:
|
||||
if hook.check_for_key(key=f"{key}.PROCESSED", bucket_name=S3_BUCKET_NAME):
|
||||
print(f'Skipping {entity}: {key}')
|
||||
continue
|
||||
print(f'Processing {entity}: {key}')
|
||||
s3_obj = hook.get_key(key, bucket_name=S3_BUCKET_NAME)
|
||||
with s3_obj.get()["Body"] as body:
|
||||
with gzip.GzipFile(fileobj=body) as gzipfile:
|
||||
def _generate_data():
|
||||
buff = io.BufferedReader(gzipfile)
|
||||
for line in buff:
|
||||
data = json.loads(line)
|
||||
data['_index'] = entity
|
||||
data['_id'] = data['local_identifier']
|
||||
yield data
|
||||
|
||||
# disable success post logging
|
||||
logging.getLogger("opensearch").setLevel(logging.WARN)
|
||||
succeeded = 0
|
||||
failed = 0
|
||||
for success, item in helpers.parallel_bulk(client, actions=_generate_data(),
|
||||
raise_on_exception=False,
|
||||
raise_on_error=False,
|
||||
chunk_size=5000,
|
||||
max_chunk_bytes=50 * 1024 * 1024,
|
||||
timeout=180):
|
||||
if success:
|
||||
succeeded = succeeded + 1
|
||||
else:
|
||||
print(item["index"]["error"])
|
||||
failed = failed+1
|
||||
|
||||
if failed > 0:
|
||||
print(f"There were {failed} errors:")
|
||||
else:
|
||||
hook.load_string(
|
||||
"",
|
||||
f"{key}.PROCESSED",
|
||||
bucket_name=S3_BUCKET_NAME,
|
||||
replace=False
|
||||
)
|
||||
|
||||
if succeeded > 0:
|
||||
print(f"Bulk-inserted {succeeded} items (streaming_bulk).")
|
||||
|
||||
@task
|
||||
def close_indexes():
|
||||
client = OpenSearch(
|
||||
hosts=[{'host': OPENSEARCH_HOST, 'port': 9200}],
|
||||
http_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWD),
|
||||
use_ssl=True,
|
||||
verify_certs=False,
|
||||
ssl_show_warn=False,
|
||||
pool_maxsize=20,
|
||||
timeout=180
|
||||
)
|
||||
for entity in ENTITIES:
|
||||
client.indices.refresh(entity)
|
||||
|
||||
parallel_batches = PythonOperator(task_id="compute_parallel_batches", python_callable=compute_batches)
|
||||
|
||||
chain(
|
||||
# unzip_to_s3.override(task_id="unzip_to_s3")("dump.zip", S3_BUCKET_NAME),
|
||||
create_indexes.override(task_id="create_indexes")(),
|
||||
parallel_batches,
|
||||
bulk_load.expand_kwargs(parallel_batches.output),
|
||||
close_indexes.override(task_id="close_indexes")()
|
||||
)
|
||||
|
||||
import_EOSC_graph()
|
|
@ -12,6 +12,9 @@ mappings['datasources'] = {
|
|||
"local_identifier": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"eoscId": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"name": {
|
||||
"fields": {
|
||||
"keyword": {
|
||||
|
|
Loading…
Reference in New Issue