From 0aba5ef69fa17ecfebf8074bb7273c4558f7db68 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Mon, 1 Jul 2024 18:44:11 +0200 Subject: [PATCH] initial stage --- airflow/dags/openaire_to_S3.py | 55 ++++++++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/airflow/dags/openaire_to_S3.py b/airflow/dags/openaire_to_S3.py index 17f51d5..4b50d0c 100644 --- a/airflow/dags/openaire_to_S3.py +++ b/airflow/dags/openaire_to_S3.py @@ -1,6 +1,7 @@ import os from datetime import timedelta +import boto3 import pendulum import requests from airflow.decorators import dag @@ -34,18 +35,62 @@ default_args = { def openaire_to_s3(): @task def download(**context): - conn = BaseHook.get_connection("openaire_default") + http_conn = BaseHook.get_connection("openaire_default") session = requests.Session() session.headers['Connection'] = 'close' - session.auth = (conn.login, conn.password) + session.auth = (http_conn.login, http_conn.password) hook = S3Hook(S3_CONN_ID, transfer_config_args={'multipart_chunksize': 100 * (1024*1024), 'max_io_queue':10, 'io_chunksize':10*1024*1024, 'use_threads': False}) + url = "https://" + http_conn.host + "/data/graph/" + context["params"]["file"] + bucket_name = context["params"]["dst_bucket"] + s3_key = "/data/graph/" + context["params"]["file"] + s3 = hook.get_conn() + parts = [] + current_size = 0 + part_number = 1 + chunk_size = 0 - with session.get("https://" + conn.host + "/data/graph/" + context["params"]["file"], stream=True) as r: - r.raise_for_status() - hook.load_file_obj(r.raw, "/data/graph/" + context["params"]["file"], bucket_name=context["params"]["dst_bucket"], replace=True, encrypt=False) + hook.delete_objects(bucket=bucket_name, + keys=[s3_key]) + response = s3.create_multipart_upload(Bucket=bucket_name, + Key=s3_key) + upload_id = response['UploadId'] + + tries = 10 + for i in range(tries): + try: + with session.get(url, + headers={'Range': 'bytes=%d-' % current_size}, + stream=True) as r: + if chunk_size == 0: + chunk_size = min(int(r.headers['Content-length']) // (10000-1), 33*1024*1024) + for chunk in r.iter_content(chunk_size=chunk_size): + if chunk: + # Upload part by part to S3 + response = s3.upload_part( + Body=chunk, + Bucket=bucket_name, + Key=s3_key, + PartNumber=part_number, + UploadId=upload_id + ) + parts.append({'PartNumber': part_number, 'ETag': response['ETag']}) + part_number += 1 + current_size += len(chunk) + except Exception as e: + if i < tries - 1: + continue + else: + raise + + s3.complete_multipart_upload( + Bucket=bucket_name, + Key=s3_key, + UploadId=upload_id, + MultipartUpload={'Parts': parts} + ) download()