simple test DAG

This commit is contained in:
Giambattista Bloisi 2024-03-08 15:35:45 +01:00
parent 9682e09eb4
commit 1ad289e948
1 changed files with 3 additions and 1 deletions

View File

@ -17,13 +17,13 @@ S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "skg-if")
AWS_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}
@task
def unzip_to_s3(key: str, bucket: str):
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
@ -37,6 +37,7 @@ def unzip_to_s3(key: str, bucket: str):
with z.open(member.filename) as fd:
hook.load_file_obj(BytesIO(fd.read()), member.filename, S3_BUCKET_NAME)
@task
def bulk_load(entity: str):
session = requests.Session()
@ -73,6 +74,7 @@ def bulk_load(entity: str):
session.post(f'https://opensearch-cluster.lot1-opensearch-cluster.svc.cluster.local:9200/{entity}_index/_refresh',
verify=False)
with DAG(
dag_id="skg_if_pipeline",
schedule=None,