code-infrastructure-lab/workflow/dnet/build_openaire_graph.py

50 lines
1.6 KiB
Python
Raw Normal View History

from __future__ import annotations
import pendulum
from airflow.decorators import dag
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
import dag_utils
@dag(
dag_id="build_openaire_graph",
dag_display_name="Build the OpenAIRE graph",
params={
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection for S3 endpoint")
},
tags=["openaire"]
)
def build_new_graph():
chain(TriggerDagRunOperator(
task_id="dedup",
trigger_dag_id="dedup_graph",
wait_for_completion=True),
TriggerDagRunOperator(
task_id="consistency",
trigger_dag_id="consistency_graph",
wait_for_completion=True
# conf={
# "file": "{{ task_instance.xcom_pull(task_ids='check_new_dump_availability', key='file_path') }}",
# "dst_bucket": "{{ dag_run.conf.get('S3_BUCKET') }}",
# }
),
TriggerDagRunOperator(
task_id="orcid_enrichment",
trigger_dag_id="orcid_enrichment_graph",
wait_for_completion=True
# conf={
# "src_key": "/data/graph/{{ task_instance.xcom_pull(task_ids='check_new_dump_availability', key='file_path') }}",
# "src_bucket": "{{ dag_run.conf.get('S3_BUCKET') }}",
# "dst_key_prefix": "/data/graph/{{ task_instance.xcom_pull(task_ids='check_new_dump_availability', key='timestamp') }}",
# "dst_bucket": "{{ dag_run.conf.get('S3_BUCKET') }}"
# }
)
)
build_new_graph()