OpenAIRE workflows for Airflow
Go to file
Giambattista Bloisi 1a537a9e0f Rename removeContextPath in removeConstraintsPath for Result2CommunitySemRelPropagation step 2026-05-08 14:06:15 +02:00
graph-aggregation updated wrong param 2026-05-06 15:46:27 +02:00
graph-monitor updated wf to pass the image as parram 2026-04-21 13:57:34 +02:00
graph-pipeline Rename removeContextPath in removeConstraintsPath for Result2CommunitySemRelPropagation step 2026-05-08 14:06:15 +02:00
.gitignore Initial commit 2025-11-12 16:01:38 +01:00
LICENSE Initial commit 2025-11-12 16:01:38 +01:00
README.md refactored helper methods to run spark and pyspark apps 2026-02-24 21:30:28 +01:00

README.md

airflow-workflows

OpenAIRE workflows for Airflow.

This repository contains the DAGs and shared utilities used by the OpenAIRE graph-building pipeline running on Apache Airflow. The graph-pipeline/ package provides three helper modules that drastically reduce the boilerplate needed to define Spark-based workflows.


Airflow Utility Modules

spark_configurator.py — Spark Operator Factory

Abstracts away the creation of Spark operators so that DAG authors only need to specify the what (main class / script, arguments, jar) and never the how (Kubernetes specs, SparkSubmit wiring, template merging).

The active Spark backend is selected at runtime via the Airflow Variable spark_provider:

Value Backend
"operator" (default) SparkKubernetesAppOperator — builds a full SparkApplication CRD template_spec (metadata, mainClass / type: Python, deps, sparkConf) by deep-merging the caller's parameters with the environment template stored in the spark_template Variable.
"submit" SparkSubmitAppOperator — delegates to spark-submit. Automatically distinguishes Java/Scala jobs from PySpark jobs by checking whether main_class ends with .py.
anything else Falls back to an EmptyOperator (useful for dry-run / testing).

spark_configurator is a thin factory: all spec-building and provider-dispatch logic lives in the operator classes defined in task_decorators.py. generate_spark_operator simply passes constructor arguments and a no-op callable.

Key functions:

  • generate_spark_operator(task_id, main_class, arguments, ...) — main entry-point. Accepts optional jar, task_display_name, spark_extra_conf, image, and py_files.
  • java_action(task_id, main_class, arguments, ...) — convenience wrapper for non-Spark Java jobs. Delegates to the RunJavaSparkJob driver and pins dynamic allocation to a single executor.
  • generate_pyspark_task(task_id, script, arguments, ...) — convenience wrapper for PySpark jobs. script is the path to the .py entry-point. Optional py_files adds .zip/.egg packages to the executor classpath (deps.pyFiles / --py-files); optional jar adds a JVM dependency (e.g. for custom UDFs).

JVM usage example:

from spark_configurator import generate_spark_operator

task = generate_spark_operator(
    task_id='cleansparkjob_publication',
    task_display_name="Clean publication",
    jar="{{ params.get('JAR') }}",
    main_class="eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob",
    arguments=["--inputPath", "{{ params.get('INPUT_PATH') }}/publication", ...]
)

PySpark usage example:

from spark_configurator import generate_pyspark_task

# single-file script
task = generate_pyspark_task(
    task_id='my_pyspark_task',
    script="s3://bucket/scripts/my_script.py",
    arguments=["--input", "/data/in"],
    image="{{ params.get('SPARK_IMAGE') }}",
)

# packaged modules + optional JVM UDF jar
task = generate_pyspark_task(
    task_id='my_pyspark_task',
    script="s3://bucket/scripts/main.py",
    arguments=["--input", "/data/in"],
    py_files=["s3://bucket/mypackage.zip"],
    jar="s3://bucket/my-udfs.jar",
)

task_decorators.py — Operator Classes & @sparkapp Decorator

Centralises all Spark spec-building logic and exposes it both as operator classes (consumed by spark_configurator) and as the @task.sparkapp decorator.

Shared helpers (module-level):

  • _resolve_config(config, defaults) — merges a callable's return dict over constructor defaults for jar, main_class, arguments, spark_extra_conf, image, and py_files. Derives is_python (.py suffix) and effective_jar (explicit jar, or dhp default for JVM, or None for Python).
  • _build_k8s_spec(task_id, r) — builds the complete SparkApplication template_spec dict. Python jobs get type: Python with optional deps.pyFiles / deps.jars; JVM jobs get mainClass / deps.jars. Merges sparkConf and the spark_template env template.

Operator classes:

  • SparkKubernetesAppOperator(SparkKubernetesOperator) — the K8s CRD operator. Stores all Spark parameters as self._defaults. In execute(): invokes the callable, calls _resolve_config + _build_k8s_spec, Jinja-renders the result, then delegates to SparkKubernetesOperator.execute().
  • SparkSubmitAppOperator(SparkSubmitOperator) — the spark-submit operator. Same constructor interface. In execute(): invokes the callable, calls _resolve_config, writes the resolved values to SparkSubmitOperator instance variables (_application, _java_class, _jars, _py_files, _application_args, _conf), then delegates to SparkSubmitOperator.execute().

Both operators share the same constructor signature:

jar, main_class, arguments, spark_extra_conf, image, py_files

Parameters set at construction time are used as defaults; the callable's return dict can override any subset of them at runtime.

Important: the callable is never invoked at DAG-parse time — only inside execute(), when Airflow template variables ({{ ds }}, etc.) and the full task context are available.

sparkapp_task decorator:

sparkapp_task() plugs into Airflow's task_decorator_factory and picks SparkKubernetesAppOperator or SparkSubmitAppOperator based on spark_provider. The module registers itself as a custom Airflow provider (sparkapp-airflow-providers) at import time so @task.sparkapp is discoverable by name.

Usage example:

@task.sparkapp(
    task_id='my_spark_step',
    jar="{{ params.get('JAR') }}",
    main_class="com.example.MyJob",
)
def my_spark_step():
    # return dict overrides / extends the constructor defaults at runtime
    return {
        "arguments": ["--input", "/data/in", "--output", "/data/out"],
        "spark_extra_conf": {"spark.executor.memory": "4g"},
    }

# PySpark variant — image and py_files resolved at runtime from DAG params
@task.sparkapp(task_id='my_pyspark_step')
def my_pyspark_step():
    context = get_current_context()
    params = context["params"]
    return {
        "main_class": params["SCRIPT_PATH"],      # .py → Python mode auto-detected
        "arguments":  params["ARGUMENTS"],
        "image":      params.get("SPARK_IMAGE") or None,
        "py_files":   params.get("PY_FILES") or None,
    }

dag_utils.py — DAG Helpers & Constants

A collection of shared utilities and constants used across all DAGs.

Selective Execution (START_FROM / STOP_AT)

When a DAG has many sequential steps, re-running everything from scratch after a mid-pipeline failure is expensive. dag_utils provides a selective execution mechanism:

  • auto_skip(task) — wraps any operator with a pre_execute hook that reads START_FROM, STOP_AT, and per-task SKIP_<TASK_ID> flags from dag_run.conf. Tasks upstream of START_FROM or downstream of STOP_AT are automatically skipped. The trigger rule of affected tasks is changed from all_success to none_failed so that downstream tasks still fire after skips.
  • dag_add_skipping(dag) — applies auto_skip to every task in the DAG and populates the START_FROM / STOP_AT params with an enum of all task IDs for easy selection in the Airflow UI.
  • skip_if_needed(fn) — a decorator variant of the same logic, intended for @task-decorated functions.

Usage example (from dedup.py):

from dag_utils import auto_skip

simrel = auto_skip(generate_spark_operator(
    task_id='CreateSimRel', ...
))

Deep Dictionary Merge

  • merge_dicts(*dicts) — recursively merges multiple dicts with first-dict-wins semantics. Used everywhere to layer caller-specific Spark config on top of environment defaults.

Chaining Helpers

  • chain_sequence(*tasks) — a flexible version of Airflow's chain(). Accepts a mix of single tasks and lists (fan-out / fan-in). When a list appears, all items in the list run in parallel; internal sequences within a list are chained automatically.

Shared Constants

Constant Purpose
BUILD_PHASES Maps human-readable phase names ("raw", "dedup", "enriched", ...) to directory prefixes used on the filesystem.
GRAPH_ENTITIES Ordered list of all graph entity types (publication, dataset, software, ...).
GRAPH_ENTITIES_CLASS_NAMES Maps entity names to their fully-qualified Java class names.
get_fs_prefix() Returns the base filesystem path from the default_fs Airflow Variable.
get_dhp_jar() Returns the path to the DHP shaded jar (from default_jar Variable, or a default).