initial stage

This commit is contained in:
Giambattista Bloisi 2024-08-01 10:46:37 +02:00
parent 9435d23083
commit f22f89b54c
1 changed files with 5 additions and 10 deletions

View File

@ -47,13 +47,11 @@ def import_s3_openaire_dump():
@task_group @task_group
def load_and_map_entity(entity: str): def load_and_map_entity(entity: str):
kwargs = get_current_context()
@task @task
def compute_batches(): def compute_batches():
nonlocal entity nonlocal entity
kwargs = get_current_context() kwargs = get_current_context()
return [(entity, '1'), (entity, '2')] return [[(entity, '1'), (entity, '2')], [], []]
@task(executor_config={ @task(executor_config={
"pod_override": k8s.V1Pod( "pod_override": k8s.V1Pod(
@ -72,24 +70,21 @@ def import_s3_openaire_dump():
) )
) )
}) })
def bulk_load(files: list[(str, str)], **kwargs): def parallel_load(files: list[(str, str)], **kwargs):
kwargs = get_current_context() kwargs = get_current_context()
print(files) print(files)
if entity in kwargs["params"]["ENTITIES"]: parallel_load.expand(compute_batches())
bulk_load(compute_batches())
else:
EmptyOperator(task_id=f"${entity}_skipped")
@task @task
def merge_curation_db(**kwargs): def merge_curation_db(**kwargs):
pass pass
@task(task_display_name="Cleanup missing curated entities") @task
def delete_missing_curated(**kwargs): def delete_missing_curated(**kwargs):
pass pass
@task(task_display_name="Refresh indexes and link them to aliases") @task
def close_indexes(**kwargs): def close_indexes(**kwargs):
pass pass