initial stage
This commit is contained in:
parent
26c2e3eaad
commit
185ca78f71
|
@ -20,7 +20,7 @@ from opensearchpy import OpenSearch, helpers
|
||||||
from EOSC_indexes import mappings
|
from EOSC_indexes import mappings
|
||||||
from EOSC_entity_trasform import transform_entities
|
from EOSC_entity_trasform import transform_entities
|
||||||
|
|
||||||
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME", "eosc-portal-import")
|
|
||||||
S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
|
S3_CONN_ID = os.getenv("S3_CONN_ID", "s3_conn")
|
||||||
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
|
||||||
|
|
||||||
|
@ -40,6 +40,10 @@ default_args = {
|
||||||
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
|
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
|
||||||
catchup=False,
|
catchup=False,
|
||||||
default_args=default_args,
|
default_args=default_args,
|
||||||
|
params={
|
||||||
|
"S3_CONN_ID": "s3_conn",
|
||||||
|
"EOSC_CATALOG_BUCKET": "eosc-portal-import"
|
||||||
|
},
|
||||||
tags=["lot1"]
|
tags=["lot1"]
|
||||||
)
|
)
|
||||||
def import_EOSC_graph():
|
def import_EOSC_graph():
|
||||||
|
@ -87,10 +91,10 @@ def import_EOSC_graph():
|
||||||
hook = S3Hook(S3_CONN_ID, transfer_config_args={'use_threads': False})
|
hook = S3Hook(S3_CONN_ID, transfer_config_args={'use_threads': False})
|
||||||
pieces = []
|
pieces = []
|
||||||
for entity in ENTITIES:
|
for entity in ENTITIES:
|
||||||
keys = hook.list_keys(bucket_name=S3_BUCKET_NAME, prefix=f'{entity}/')
|
keys = hook.list_keys(bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"], prefix=f'{entity}/')
|
||||||
to_delete = list(filter(lambda key: key.endswith('.PROCESSED'), keys))
|
to_delete = list(filter(lambda key: key.endswith('.PROCESSED'), keys))
|
||||||
for obj in to_delete:
|
for obj in to_delete:
|
||||||
hook.get_conn().delete_object(Bucket=S3_BUCKET_NAME, Key=obj)
|
hook.get_conn().delete_object(Bucket=kwargs["params"]["EOSC_CATALOG_BUCKET"], Key=obj)
|
||||||
for key in keys:
|
for key in keys:
|
||||||
if key.endswith('.gz'):
|
if key.endswith('.gz'):
|
||||||
pieces.append((entity, key))
|
pieces.append((entity, key))
|
||||||
|
@ -118,7 +122,7 @@ def import_EOSC_graph():
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
def bulk_load(files: list[(str, str)]):
|
def bulk_load(files: list[(str, str)], **kwargs):
|
||||||
conn = BaseHook.get_connection('opensearch_default')
|
conn = BaseHook.get_connection('opensearch_default')
|
||||||
client = OpenSearch(
|
client = OpenSearch(
|
||||||
hosts=[{'host': conn.host, 'port': conn.port}],
|
hosts=[{'host': conn.host, 'port': conn.port}],
|
||||||
|
@ -130,11 +134,11 @@ def import_EOSC_graph():
|
||||||
hook = S3Hook(S3_CONN_ID, transfer_config_args={'use_threads': False})
|
hook = S3Hook(S3_CONN_ID, transfer_config_args={'use_threads': False})
|
||||||
|
|
||||||
for (entity, key) in files:
|
for (entity, key) in files:
|
||||||
if hook.check_for_key(key=f"{key}.PROCESSED", bucket_name=S3_BUCKET_NAME):
|
if hook.check_for_key(key=f"{key}.PROCESSED", bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"]):
|
||||||
print(f'Skipping {entity}: {key}')
|
print(f'Skipping {entity}: {key}')
|
||||||
continue
|
continue
|
||||||
print(f'Processing {entity}: {key}')
|
print(f'Processing {entity}: {key}')
|
||||||
s3_obj = hook.get_key(key, bucket_name=S3_BUCKET_NAME)
|
s3_obj = hook.get_key(key, bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"])
|
||||||
with s3_obj.get()["Body"] as body:
|
with s3_obj.get()["Body"] as body:
|
||||||
with gzip.GzipFile(fileobj=body) as gzipfile:
|
with gzip.GzipFile(fileobj=body) as gzipfile:
|
||||||
def _generate_data():
|
def _generate_data():
|
||||||
|
@ -169,7 +173,7 @@ def import_EOSC_graph():
|
||||||
hook.load_string(
|
hook.load_string(
|
||||||
"",
|
"",
|
||||||
f"{key}.PROCESSED",
|
f"{key}.PROCESSED",
|
||||||
bucket_name=S3_BUCKET_NAME,
|
bucket_name=kwargs["params"]["EOSC_CATALOG_BUCKET"],
|
||||||
replace=False
|
replace=False
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue