Add workflow with all graph construction steps
This commit is contained in:
parent
aae37058f7
commit
73e78d6877
|
@ -0,0 +1,49 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pendulum
|
||||||
|
from airflow.decorators import dag
|
||||||
|
from airflow.models.baseoperator import chain
|
||||||
|
from airflow.models.param import Param
|
||||||
|
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
|
||||||
|
|
||||||
|
import dag_utils
|
||||||
|
|
||||||
|
@dag(
|
||||||
|
dag_id="build_openaire_graph",
|
||||||
|
dag_display_name="Build the OpenAIRE graph",
|
||||||
|
params={
|
||||||
|
"S3_CONN_ID": Param("s3_conn", type='string', description="Airflow connection for S3 endpoint")
|
||||||
|
},
|
||||||
|
tags=["openaire"]
|
||||||
|
)
|
||||||
|
def build_new_graph():
|
||||||
|
chain(TriggerDagRunOperator(
|
||||||
|
task_id="dedup",
|
||||||
|
trigger_dag_id="dedup_graph",
|
||||||
|
wait_for_completion=True),
|
||||||
|
|
||||||
|
TriggerDagRunOperator(
|
||||||
|
task_id="consistency",
|
||||||
|
trigger_dag_id="consistency_graph",
|
||||||
|
wait_for_completion=True
|
||||||
|
|
||||||
|
# conf={
|
||||||
|
# "file": "{{ task_instance.xcom_pull(task_ids='check_new_dump_availability', key='file_path') }}",
|
||||||
|
# "dst_bucket": "{{ dag_run.conf.get('S3_BUCKET') }}",
|
||||||
|
# }
|
||||||
|
),
|
||||||
|
TriggerDagRunOperator(
|
||||||
|
task_id="orcid_enrichment",
|
||||||
|
trigger_dag_id="orcid_enrichment_graph",
|
||||||
|
wait_for_completion=True
|
||||||
|
|
||||||
|
# conf={
|
||||||
|
# "src_key": "/data/graph/{{ task_instance.xcom_pull(task_ids='check_new_dump_availability', key='file_path') }}",
|
||||||
|
# "src_bucket": "{{ dag_run.conf.get('S3_BUCKET') }}",
|
||||||
|
# "dst_key_prefix": "/data/graph/{{ task_instance.xcom_pull(task_ids='check_new_dump_availability', key='timestamp') }}",
|
||||||
|
# "dst_bucket": "{{ dag_run.conf.get('S3_BUCKET') }}"
|
||||||
|
# }
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
build_new_graph()
|
Loading…
Reference in New Issue