simple test DAG

This commit is contained in:
Giambattista Bloisi 2024-03-12 15:58:23 +01:00
parent e2a5f3e90e
commit f5ef2d3754
1 changed files with 12 additions and 9 deletions

View File

@ -13,6 +13,7 @@ from airflow.decorators import dag
from airflow.decorators import task from airflow.decorators import task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.utils.file import TemporaryDirectory from airflow.utils.file import TemporaryDirectory
from airflow.utils.helpers import chain
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "skg-if") S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "skg-if")
@ -101,14 +102,16 @@ def skg_if_pipeline():
f'https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200/{entity}_index/_refresh', f'https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200/{entity}_index/_refresh',
verify=False) verify=False)
unzip_to_s3("dump.zip", S3_BUCKET_NAME) chain(
bulk_load("datasources") unzip_to_s3.override(task_id=f"unzip_to_s3")("dump.zip", S3_BUCKET_NAME),
bulk_load("grants") bulk_load.override(task_id=f"load_datasources")("datasources"),
bulk_load("organizations") bulk_load("grants"),
bulk_load("persons") bulk_load("organizations"),
bulk_load("products") bulk_load("persons"),
bulk_load("topics") bulk_load("products"),
bulk_load("topics"),
bulk_load("venues") bulk_load("venues")
)
skg_if_pipeline() skg_if_pipeline()