initial stage
This commit is contained in:
parent
9581a86313
commit
b23ddd3002
|
@ -6,6 +6,7 @@ from datetime import timedelta
|
|||
import pendulum
|
||||
from airflow.decorators import dag, task_group
|
||||
from airflow.decorators import task
|
||||
from airflow.exceptions import AirflowSkipException
|
||||
from airflow.operators.empty import EmptyOperator
|
||||
from airflow.operators.python import get_current_context
|
||||
from airflow.utils.helpers import chain
|
||||
|
@ -51,6 +52,8 @@ def import_s3_openaire_dump():
|
|||
def compute_batches():
|
||||
nonlocal entity
|
||||
kwargs = get_current_context()
|
||||
if entity not in kwargs["params"]["ENTITIES"]:
|
||||
raise AirflowSkipException(f"Skipping {entity}")
|
||||
return [[(entity, '1'), (entity, '2')], [], []]
|
||||
|
||||
@task(executor_config={
|
||||
|
|
Loading…
Reference in New Issue