simple test DAG
This commit is contained in:
parent
636a4e38e9
commit
f1cec0cfeb
|
@ -46,11 +46,6 @@ def strip_prefix(s, p):
|
|||
return s
|
||||
|
||||
|
||||
def split_list(list_a, chunk_size):
|
||||
for i in range(0, len(list_a), chunk_size):
|
||||
yield list_a[i:i + chunk_size]
|
||||
|
||||
|
||||
@dag(
|
||||
schedule=None,
|
||||
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
|
||||
|
@ -113,9 +108,13 @@ def skg_if_pipeline():
|
|||
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
|
||||
keys = hook.list_keys(bucket_name=S3_BUCKET_NAME, prefix=f'{entity}/')
|
||||
for key in keys:
|
||||
pieces.append((entity, key))
|
||||
pieces.append({entity, key})
|
||||
|
||||
return list(split_list(pieces, BULK_PARALLELISM))
|
||||
def split_list(list_a, chunk_size):
|
||||
for i in range(0, len(list_a), chunk_size):
|
||||
yield {"files": list_a[i:i + chunk_size]}
|
||||
|
||||
return list(split_list(pieces, len(pieces)/BULK_PARALLELISM))
|
||||
|
||||
@task
|
||||
def bulk_load(files: list[(str, str)]):
|
||||
|
|
Loading…
Reference in New Issue