diff --git a/airflow/dags/import_skg_if.py b/airflow/dags/import_skg_if.py index 186cb9d..ea5206b 100644 --- a/airflow/dags/import_skg_if.py +++ b/airflow/dags/import_skg_if.py @@ -36,7 +36,7 @@ BULK_PARALLELISM = 10 # default_args = { - "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT), + "execution_timeout": timedelta(days=EXECUTION_TIMEOUT), "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)), "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), } @@ -51,6 +51,7 @@ def strip_prefix(s, p): @dag( schedule=None, + dagrun_timeout=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, default_args=default_args, @@ -126,10 +127,12 @@ def skg_if_pipeline(): }) def compute_batches(ds=None, **kwargs): + hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False}) pieces = [] for entity in ENTITIES: - hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False}) keys = hook.list_keys(bucket_name=S3_BUCKET_NAME, prefix=f'{entity}/') + to_delete = filter(lambda key: key.endswith('.PROCESSED'), keys) + hook.delete_objects(bucket=S3_BUCKET_NAME,keys=to_delete) for key in keys: if key.endswith('.gz'): pieces.append((entity, key)) @@ -140,7 +143,7 @@ def skg_if_pipeline(): return list(split_list(pieces, len(pieces)//BULK_PARALLELISM)) - @task(execution_timeout=timedelta(days=3), executor_config={ + @task(executor_config={ "pod_override": k8s.V1Pod( spec=k8s.V1PodSpec( containers=[ @@ -169,7 +172,10 @@ def skg_if_pipeline(): hook = S3Hook(AWS_CONN_ID, transfer_config_args={'use_threads': False}) for (entity, key) in files: - print(f'{entity}: {key}') + if hook.check_for_key(key=f"{key}.PROCESSED", bucket_name=S3_BUCKET_NAME): + print(f'Skipping {entity}: {key}') + continue + print(f'Processing {entity}: {key}') s3_obj = hook.get_key(key, bucket_name=S3_BUCKET_NAME) with s3_obj.get()["Body"] as body: with gzip.GzipFile(fileobj=body) as gzipfile: @@ -188,7 +194,8 @@ def skg_if_pipeline(): for success, item in helpers.parallel_bulk(client, actions=_generate_data(), raise_on_exception=False, raise_on_error=False, - chunk_size=5000, max_chunk_bytes=50 * 1024 * 1024, + chunk_size=5000, + max_chunk_bytes=50 * 1024 * 1024, timeout=180): if success: succeeded = succeeded + 1 @@ -198,6 +205,13 @@ def skg_if_pipeline(): if failed > 0: print(f"There were {failed} errors:") + else: + hook.load_string( + "", + f"{key}.PROCESSED", + bucket_name=S3_BUCKET_NAME, + replace=False + ) if succeeded > 0: print(f"Bulk-inserted {succeeded} items (streaming_bulk).")