|
|
||
|---|---|---|
| graph-aggregation | ||
| graph-monitor | ||
| graph-pipeline | ||
| .gitignore | ||
| LICENSE | ||
| README.md | ||
README.md
airflow-workflows
OpenAIRE workflows for Airflow.
This repository contains the DAGs and shared utilities used by the OpenAIRE graph-building pipeline running on Apache Airflow. The graph-pipeline/ package provides three helper modules that drastically reduce the boilerplate needed to define Spark-based workflows.
Airflow Utility Modules
spark_configurator.py — Spark Operator Factory
Abstracts away the creation of Spark operators so that DAG authors only need to specify the what (main class / script, arguments, jar) and never the how (Kubernetes specs, SparkSubmit wiring, template merging).
The active Spark backend is selected at runtime via the Airflow Variable spark_provider:
| Value | Backend |
|---|---|
"operator" (default) |
SparkKubernetesAppOperator — builds a full SparkApplication CRD template_spec (metadata, mainClass / type: Python, deps, sparkConf) by deep-merging the caller's parameters with the environment template stored in the spark_template Variable. |
"submit" |
SparkSubmitAppOperator — delegates to spark-submit. Automatically distinguishes Java/Scala jobs from PySpark jobs by checking whether main_class ends with .py. |
| anything else | Falls back to an EmptyOperator (useful for dry-run / testing). |
spark_configurator is a thin factory: all spec-building and provider-dispatch logic lives in the operator classes defined in task_decorators.py. generate_spark_operator simply passes constructor arguments and a no-op callable.
Key functions:
generate_spark_operator(task_id, main_class, arguments, ...)— main entry-point. Accepts optionaljar,task_display_name,spark_extra_conf,image, andpy_files.java_action(task_id, main_class, arguments, ...)— convenience wrapper for non-Spark Java jobs. Delegates to theRunJavaSparkJobdriver and pins dynamic allocation to a single executor.generate_pyspark_task(task_id, script, arguments, ...)— convenience wrapper for PySpark jobs.scriptis the path to the.pyentry-point. Optionalpy_filesadds.zip/.eggpackages to the executor classpath (deps.pyFiles/--py-files); optionaljaradds a JVM dependency (e.g. for custom UDFs).
JVM usage example:
from spark_configurator import generate_spark_operator
task = generate_spark_operator(
task_id='cleansparkjob_publication',
task_display_name="Clean publication",
jar="{{ params.get('JAR') }}",
main_class="eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob",
arguments=["--inputPath", "{{ params.get('INPUT_PATH') }}/publication", ...]
)
PySpark usage example:
from spark_configurator import generate_pyspark_task
# single-file script
task = generate_pyspark_task(
task_id='my_pyspark_task',
script="s3://bucket/scripts/my_script.py",
arguments=["--input", "/data/in"],
image="{{ params.get('SPARK_IMAGE') }}",
)
# packaged modules + optional JVM UDF jar
task = generate_pyspark_task(
task_id='my_pyspark_task',
script="s3://bucket/scripts/main.py",
arguments=["--input", "/data/in"],
py_files=["s3://bucket/mypackage.zip"],
jar="s3://bucket/my-udfs.jar",
)
task_decorators.py — Operator Classes & @sparkapp Decorator
Centralises all Spark spec-building logic and exposes it both as operator classes (consumed by spark_configurator) and as the @task.sparkapp decorator.
Shared helpers (module-level):
_resolve_config(config, defaults)— merges a callable's return dict over constructor defaults forjar,main_class,arguments,spark_extra_conf,image, andpy_files. Derivesis_python(.pysuffix) andeffective_jar(explicit jar, or dhp default for JVM, orNonefor Python)._build_k8s_spec(task_id, r)— builds the completeSparkApplicationtemplate_specdict. Python jobs gettype: Pythonwith optionaldeps.pyFiles/deps.jars; JVM jobs getmainClass/deps.jars. MergessparkConfand thespark_templateenv template.
Operator classes:
SparkKubernetesAppOperator(SparkKubernetesOperator)— the K8s CRD operator. Stores all Spark parameters asself._defaults. Inexecute(): invokes the callable, calls_resolve_config+_build_k8s_spec, Jinja-renders the result, then delegates toSparkKubernetesOperator.execute().SparkSubmitAppOperator(SparkSubmitOperator)— the spark-submit operator. Same constructor interface. Inexecute(): invokes the callable, calls_resolve_config, writes the resolved values toSparkSubmitOperatorinstance variables (_application,_java_class,_jars,_py_files,_application_args,_conf), then delegates toSparkSubmitOperator.execute().
Both operators share the same constructor signature:
jar, main_class, arguments, spark_extra_conf, image, py_files
Parameters set at construction time are used as defaults; the callable's return dict can override any subset of them at runtime.
Important: the callable is never invoked at DAG-parse time — only inside execute(), when Airflow template variables ({{ ds }}, etc.) and the full task context are available.
sparkapp_task decorator:
sparkapp_task() plugs into Airflow's task_decorator_factory and picks SparkKubernetesAppOperator or SparkSubmitAppOperator based on spark_provider. The module registers itself as a custom Airflow provider (sparkapp-airflow-providers) at import time so @task.sparkapp is discoverable by name.
Usage example:
@task.sparkapp(
task_id='my_spark_step',
jar="{{ params.get('JAR') }}",
main_class="com.example.MyJob",
)
def my_spark_step():
# return dict overrides / extends the constructor defaults at runtime
return {
"arguments": ["--input", "/data/in", "--output", "/data/out"],
"spark_extra_conf": {"spark.executor.memory": "4g"},
}
# PySpark variant — image and py_files resolved at runtime from DAG params
@task.sparkapp(task_id='my_pyspark_step')
def my_pyspark_step():
context = get_current_context()
params = context["params"]
return {
"main_class": params["SCRIPT_PATH"], # .py → Python mode auto-detected
"arguments": params["ARGUMENTS"],
"image": params.get("SPARK_IMAGE") or None,
"py_files": params.get("PY_FILES") or None,
}
dag_utils.py — DAG Helpers & Constants
A collection of shared utilities and constants used across all DAGs.
Selective Execution (START_FROM / STOP_AT)
When a DAG has many sequential steps, re-running everything from scratch after a mid-pipeline failure is expensive. dag_utils provides a selective execution mechanism:
auto_skip(task)— wraps any operator with apre_executehook that readsSTART_FROM,STOP_AT, and per-taskSKIP_<TASK_ID>flags fromdag_run.conf. Tasks upstream ofSTART_FROMor downstream ofSTOP_ATare automatically skipped. The trigger rule of affected tasks is changed fromall_successtonone_failedso that downstream tasks still fire after skips.dag_add_skipping(dag)— appliesauto_skipto every task in the DAG and populates theSTART_FROM/STOP_ATparams with an enum of all task IDs for easy selection in the Airflow UI.skip_if_needed(fn)— a decorator variant of the same logic, intended for@task-decorated functions.
Usage example (from dedup.py):
from dag_utils import auto_skip
simrel = auto_skip(generate_spark_operator(
task_id='CreateSimRel', ...
))
Deep Dictionary Merge
merge_dicts(*dicts)— recursively merges multiple dicts with first-dict-wins semantics. Used everywhere to layer caller-specific Spark config on top of environment defaults.
Chaining Helpers
chain_sequence(*tasks)— a flexible version of Airflow'schain(). Accepts a mix of single tasks and lists (fan-out / fan-in). When a list appears, all items in the list run in parallel; internal sequences within a list are chained automatically.
Shared Constants
| Constant | Purpose |
|---|---|
BUILD_PHASES |
Maps human-readable phase names ("raw", "dedup", "enriched", ...) to directory prefixes used on the filesystem. |
GRAPH_ENTITIES |
Ordered list of all graph entity types (publication, dataset, software, ...). |
GRAPH_ENTITIES_CLASS_NAMES |
Maps entity names to their fully-qualified Java class names. |
get_fs_prefix() |
Returns the base filesystem path from the default_fs Airflow Variable. |
get_dhp_jar() |
Returns the path to the DHP shaded jar (from default_jar Variable, or a default). |