initial stage

This commit is contained in:
Giambattista Bloisi 2024-07-01 18:44:11 +02:00
parent 2a54a3e325
commit 0aba5ef69f
1 changed files with 50 additions and 5 deletions

View File

@ -1,6 +1,7 @@
import os import os
from datetime import timedelta from datetime import timedelta
import boto3
import pendulum import pendulum
import requests import requests
from airflow.decorators import dag from airflow.decorators import dag
@ -34,18 +35,62 @@ default_args = {
def openaire_to_s3(): def openaire_to_s3():
@task @task
def download(**context): def download(**context):
conn = BaseHook.get_connection("openaire_default") http_conn = BaseHook.get_connection("openaire_default")
session = requests.Session() session = requests.Session()
session.headers['Connection'] = 'close' session.headers['Connection'] = 'close'
session.auth = (conn.login, conn.password) session.auth = (http_conn.login, http_conn.password)
hook = S3Hook(S3_CONN_ID, transfer_config_args={'multipart_chunksize': 100 * (1024*1024), hook = S3Hook(S3_CONN_ID, transfer_config_args={'multipart_chunksize': 100 * (1024*1024),
'max_io_queue':10, 'max_io_queue':10,
'io_chunksize':10*1024*1024, 'io_chunksize':10*1024*1024,
'use_threads': False}) 'use_threads': False})
url = "https://" + http_conn.host + "/data/graph/" + context["params"]["file"]
bucket_name = context["params"]["dst_bucket"]
s3_key = "/data/graph/" + context["params"]["file"]
s3 = hook.get_conn()
parts = []
current_size = 0
part_number = 1
chunk_size = 0
with session.get("https://" + conn.host + "/data/graph/" + context["params"]["file"], stream=True) as r: hook.delete_objects(bucket=bucket_name,
r.raise_for_status() keys=[s3_key])
hook.load_file_obj(r.raw, "/data/graph/" + context["params"]["file"], bucket_name=context["params"]["dst_bucket"], replace=True, encrypt=False) response = s3.create_multipart_upload(Bucket=bucket_name,
Key=s3_key)
upload_id = response['UploadId']
tries = 10
for i in range(tries):
try:
with session.get(url,
headers={'Range': 'bytes=%d-' % current_size},
stream=True) as r:
if chunk_size == 0:
chunk_size = min(int(r.headers['Content-length']) // (10000-1), 33*1024*1024)
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk:
# Upload part by part to S3
response = s3.upload_part(
Body=chunk,
Bucket=bucket_name,
Key=s3_key,
PartNumber=part_number,
UploadId=upload_id
)
parts.append({'PartNumber': part_number, 'ETag': response['ETag']})
part_number += 1
current_size += len(chunk)
except Exception as e:
if i < tries - 1:
continue
else:
raise
s3.complete_multipart_upload(
Bucket=bucket_name,
Key=s3_key,
UploadId=upload_id,
MultipartUpload={'Parts': parts}
)
download() download()