simple test DAG
This commit is contained in:
parent
c259c529bc
commit
031b11a3db
|
@ -34,7 +34,6 @@ def strip_prefix(s, p):
|
|||
return s
|
||||
|
||||
|
||||
@task
|
||||
def unzip_to_s3(key: str, bucket: str):
|
||||
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
|
||||
|
||||
|
@ -54,7 +53,6 @@ def unzip_to_s3(key: str, bucket: str):
|
|||
return ""
|
||||
|
||||
|
||||
@task
|
||||
def bulk_load(entity: str):
|
||||
session = requests.Session()
|
||||
session.auth = ("admin", "admin")
|
||||
|
@ -91,6 +89,11 @@ def bulk_load(entity: str):
|
|||
verify=False)
|
||||
return ""
|
||||
|
||||
@task
|
||||
def do_all(entity: str):
|
||||
unzip_to_s3("dump.zip", S3_BUCKET_NAME)
|
||||
bulk_load("research_product")
|
||||
|
||||
|
||||
with DAG(
|
||||
dag_id="skg_if_pipeline",
|
||||
|
@ -100,12 +103,9 @@ with DAG(
|
|||
default_args=default_args,
|
||||
tags=["example", "async", "s3"],
|
||||
) as dag:
|
||||
unzip_to_s3 = unzip_to_s3("dump.zip", S3_BUCKET_NAME)
|
||||
bulk_load_rp = bulk_load("research_product")
|
||||
all_done = do_all("")
|
||||
|
||||
chain(unzip_to_s3,
|
||||
bulk_load_rp
|
||||
)
|
||||
chain(all_done)
|
||||
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue