initial stage
This commit is contained in:
parent
1e3d7595ea
commit
aa38362f26
|
@ -1,5 +1,4 @@
|
||||||
import os
|
import os
|
||||||
import urllib.request
|
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
|
|
||||||
import pendulum
|
import pendulum
|
||||||
|
@ -9,10 +8,6 @@ from airflow.decorators import task
|
||||||
from airflow.hooks.base import BaseHook
|
from airflow.hooks.base import BaseHook
|
||||||
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
|
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
|
||||||
|
|
||||||
# Make requests use HTTP 1.0 to disable chunking
|
|
||||||
from http.client import HTTPConnection
|
|
||||||
HTTPConnection._http_vsn = 10
|
|
||||||
HTTPConnection._http_vsn_str = 'HTTP/1.0'
|
|
||||||
|
|
||||||
S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
|
S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
|
||||||
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
||||||
|
@ -41,6 +36,7 @@ def openaire_to_s3():
|
||||||
def download(**context):
|
def download(**context):
|
||||||
conn = BaseHook.get_connection("openaire_default")
|
conn = BaseHook.get_connection("openaire_default")
|
||||||
session = requests.Session()
|
session = requests.Session()
|
||||||
|
session.headers['Connection'] = 'close'
|
||||||
session.auth = (conn.login, conn.password)
|
session.auth = (conn.login, conn.password)
|
||||||
hook = S3Hook(S3_CONN_ID, transfer_config_args={'multipart_chunksize': 100 * (1024*1024),
|
hook = S3Hook(S3_CONN_ID, transfer_config_args={'multipart_chunksize': 100 * (1024*1024),
|
||||||
'max_io_queue':10,
|
'max_io_queue':10,
|
||||||
|
|
Loading…
Reference in New Issue