code-infrastructure-lab/workflow/dnet/S3_untar.py

101 lines
3.9 KiB
Python

import os
import tarfile
import time
from datetime import timedelta
import pendulum
from airflow.decorators import dag
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.python import get_current_context
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from botocore.exceptions import ClientError
import dag_utils
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
default_args = {
"execution_timeout": timedelta(days=EXECUTION_TIMEOUT),
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 1)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}
def load_file_obj_with_backoff(hook: S3Hook, fileobj, key: str, bucket: str, replace: bool) -> bool:
delay = 10 # initial delay
delay_incr = 10 # additional delay in each loop
max_delay = 60 # max delay of one loop. Total delay is (max_delay**2)/2
while delay < max_delay:
try:
return hook.load_file_obj(fileobj,
key,
bucket,
replace=replace)
except ClientError as err:
code = err.response.get('Error', {}).get('Code', '')
if code in ['NoSuchBucket']:
print(f"Error: {code}. Check s3path: s3://{bucket}/{key}")
raise err
time.sleep(delay)
delay += delay_incr
@dag(
dag_id="s3_untar",
dag_display_name="S3 streaming untar",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
default_args=default_args,
params={
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection of S3 endpoint"),
"src_bucket": Param(dag_utils.get_default_bucket(), type='string',
description="Override S3 default bucket for source"),
"src_key": Param("", type='string', description="File to untar"),
"dst_bucket": Param(dag_utils.get_default_bucket(), type='string',
description="Override S3 default bucket for destination"),
"dst_key_prefix": Param("", type='string', description="Key prefix for unarchived files"),
},
tags=["s3", "tools"],
)
def s3_untar():
@task
def untar():
context = get_current_context()
hook = S3Hook(context["params"]["S3_CONN_ID"], transfer_config_args={'use_threads': False})
src_bucket = context['params']['src_bucket']
dst_bucket = context['params']['dst_bucket']
dst_key_prefix = os.path.normpath(context["params"]["dst_key_prefix"])
print(f"Existing keys with prefix: {dst_key_prefix}/")
existing_keys = dict.fromkeys(hook.list_keys(bucket_name=dst_bucket,
prefix=dst_key_prefix + "/"), 0)
for k in existing_keys.keys():
print(f"{k}")
s3_obj = hook.get_key(context["params"]["src_key"], bucket_name=src_bucket)
with tarfile.open(fileobj=s3_obj.get()["Body"], mode='r|*') as tar:
for member in tar:
dst_key = os.path.normpath(dst_key_prefix + "/" + member.name)
# Ignore directories, links, devices, fifos, etc.
if (not member.isfile()) or member.name.endswith('/'):
print(f"Skipping {member.name}: is not a file")
continue
if dst_key in existing_keys:
print(f"Skipping {member.name}: already exists as {dst_key}")
continue
print(f"Extracting {member.name} to {dst_key}")
fileobj = tar.extractfile(member)
fileobj.seekable = lambda: False
load_file_obj_with_backoff(hook, fileobj,
dst_key,
dst_bucket,
replace=True)
untar()
s3_untar()