simple test DAG

This commit is contained in:
Giambattista Bloisi 2024-03-21 10:45:58 +01:00
parent 10fedb06f1
commit d660233e8e
1 changed files with 19 additions and 5 deletions

View File

@ -36,7 +36,7 @@ BULK_PARALLELISM = 10
# #
default_args = { default_args = {
"execution_timeout": timedelta(hours=EXECUTION_TIMEOUT), "execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)), "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
} }
@ -51,6 +51,7 @@ def strip_prefix(s, p):
@dag( @dag(
schedule=None, schedule=None,
dagrun_timeout=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False, catchup=False,
default_args=default_args, default_args=default_args,
@ -126,10 +127,12 @@ def skg_if_pipeline():
}) })
def compute_batches(ds=None, **kwargs): def compute_batches(ds=None, **kwargs):
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
pieces = [] pieces = []
for entity in ENTITIES: for entity in ENTITIES:
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
keys = hook.list_keys(bucket_name=S3_BUCKET_NAME, prefix=f'{entity}/') keys = hook.list_keys(bucket_name=S3_BUCKET_NAME, prefix=f'{entity}/')
to_delete = filter(lambda key: key.endswith('.PROCESSED'), keys)
hook.delete_objects(bucket=S3_BUCKET_NAME,keys=to_delete)
for key in keys: for key in keys:
if key.endswith('.gz'): if key.endswith('.gz'):
pieces.append((entity, key)) pieces.append((entity, key))
@ -140,7 +143,7 @@ def skg_if_pipeline():
return list(split_list(pieces, len(pieces)//BULK_PARALLELISM)) return list(split_list(pieces, len(pieces)//BULK_PARALLELISM))
@task(execution_timeout=timedelta(days=3), executor_config={ @task(executor_config={
"pod_override": k8s.V1Pod( "pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec( spec=k8s.V1PodSpec(
containers=[ containers=[
@ -169,7 +172,10 @@ def skg_if_pipeline():
hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False}) hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False})
for (entity, key) in files: for (entity, key) in files:
print(f'{entity}: {key}') if hook.check_for_key(key=f"{key}.PROCESSED", bucket_name=S3_BUCKET_NAME):
print(f'Skipping {entity}: {key}')
continue
print(f'Processing {entity}: {key}')
s3_obj = hook.get_key(key, bucket_name=S3_BUCKET_NAME) s3_obj = hook.get_key(key, bucket_name=S3_BUCKET_NAME)
with s3_obj.get()["Body"] as body: with s3_obj.get()["Body"] as body:
with gzip.GzipFile(fileobj=body) as gzipfile: with gzip.GzipFile(fileobj=body) as gzipfile:
@ -188,7 +194,8 @@ def skg_if_pipeline():
for success, item in helpers.parallel_bulk(client, actions=_generate_data(), for success, item in helpers.parallel_bulk(client, actions=_generate_data(),
raise_on_exception=False, raise_on_exception=False,
raise_on_error=False, raise_on_error=False,
chunk_size=5000, max_chunk_bytes=50 * 1024 * 1024, chunk_size=5000,
max_chunk_bytes=50 * 1024 * 1024,
timeout=180): timeout=180):
if success: if success:
succeeded = succeeded + 1 succeeded = succeeded + 1
@ -198,6 +205,13 @@ def skg_if_pipeline():
if failed > 0: if failed > 0:
print(f"There were {failed} errors:") print(f"There were {failed} errors:")
else:
hook.load_string(
"",
f"{key}.PROCESSED",
bucket_name=S3_BUCKET_NAME,
replace=False
)
if succeeded > 0: if succeeded > 0:
print(f"Bulk-inserted {succeeded} items (streaming_bulk).") print(f"Bulk-inserted {succeeded} items (streaming_bulk).")