initial stage

This commit is contained in:
Giambattista Bloisi 2024-03-25 18:22:10 +01:00
parent f79eb140eb
commit 00514edfbd
2 changed files with 6 additions and 33 deletions

View File

@ -46,8 +46,7 @@ default_args = {
default_args=default_args,
tags=["lot1"],
)
def eosc_catalog_import():
def import_EOSC_catalog():
@task
def create_indexes():
client = OpenSearch(
@ -135,8 +134,6 @@ def eosc_catalog_import():
if len(succeeded) > 0:
print(f"Bulk-inserted {len(succeeded)} items (streaming_bulk).")
@task
def close_indexes():
client = OpenSearch(
@ -159,4 +156,5 @@ def eosc_catalog_import():
close_indexes.override(task_id="close_indexes")()
)
eosc_catalog_import()
import_EOSC_catalog()

View File

@ -5,7 +5,6 @@ import io
import json
import logging
import os
import zipfile
from datetime import timedelta
from kubernetes.client import models as k8s
@ -14,17 +13,15 @@ from airflow.decorators import dag
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.utils.file import TemporaryDirectory
from airflow.utils.helpers import chain
from airflow.models import Variable
from opensearchpy import OpenSearch, helpers
from EOSC_indexes import mappings
from EOSC_entity_trasform import transform_entities
from common import strip_prefix
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "skgif-eosc-eu")
AWS_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
OPENSEARCH_HOST = Variable.get("OPENSEARCH_URL", "opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local")
OPENSEARCH_URL = Variable.get("OPENSEARCH_URL",
@ -55,27 +52,6 @@ default_args = {
tags=["lot1"],
)
def import_EOSC_graph():
@task
def unzip_to_s3(key: str, bucket: str):
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
with TemporaryDirectory() as dwl_dir:
with TemporaryDirectory() as tmp_dir:
archive = f'{dwl_dir}/{key}'
hook.download_file(key=key, bucket_name=bucket, local_path=dwl_dir, preserve_file_name=True,
use_autogenerated_subdir=False)
with zipfile.ZipFile(archive, 'r') as zip_ref:
zip_ref.extractall(tmp_dir)
for root, _, files in os.walk(tmp_dir):
for file in files:
if file == key:
continue
local_file_path = os.path.join(root, file)
hook.load_file(local_file_path, strip_prefix(local_file_path, tmp_dir), S3_BUCKET_NAME,
replace=True)
return ""
@task
def create_indexes():
client = OpenSearch(
@ -116,7 +92,7 @@ def import_EOSC_graph():
})
def compute_batches(ds=None, **kwargs):
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
hook = S3Hook(S3_CONN_ID, transfer_config_args={'use_threads': False})
pieces = []
for entity in ENTITIES:
keys = hook.list_keys(bucket_name=S3_BUCKET_NAME, prefix=f'{entity}/')
@ -159,7 +135,7 @@ def import_EOSC_graph():
ssl_show_warn=False,
pool_maxsize=20
)
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
hook = S3Hook(S3_CONN_ID, transfer_config_args={'use_threads': False})
for (entity, key) in files:
if hook.check_for_key(key=f"{key}.PROCESSED", bucket_name=S3_BUCKET_NAME):
@ -225,7 +201,6 @@ def import_EOSC_graph():
parallel_batches = PythonOperator(task_id="compute_parallel_batches", python_callable=compute_batches)
chain(
# unzip_to_s3.override(task_id="unzip_to_s3")("dump.zip", S3_BUCKET_NAME),
create_indexes.override(task_id="create_indexes")(),
parallel_batches,
bulk_load.expand_kwargs(parallel_batches.output),