Errors
Apache Airflow

Apache Airflow: How to create a conditional task in Airflow

Ты проектируешь даг и тебе нужно ветвление. Не просто «запусти таски A, B, C», а «если A прошло успешно — сделай B, если упало — сделай C, а потом в любом случа

Ты проектируешь даг и тебе нужно ветвление. Не просто «запусти таски A, B, C», а «если A прошло успешно — сделай B, если упало — сделай C, а потом в любом случае запусти D». И кажется, что нужно городить сложную логику с XCom и BranchOperator.

На самом деле, всё проще. В Airflow для этого есть триггерные правила (trigger rules). Каждый оператор имеет параметр trigger_rule, который определяет, когда ему можно стартовать.

Как это работает по умолчанию? Дефолтное правило — all_success. Это значит «запустись, только если все прямые родители задачи выполнились успешно». Именно поэтому в простой цепочке task1 >> task2 второй таск ждёт успешного завершения первого.

А что нам нужно? Нам нужно создать условие. После task1 должен выполниться либо task2a (если успех), либо task2b (если провал). А task3 должен запуститься в любом случае, когда task2a ИЛИ task2b завершатся.

Ключ — разные триггерные правила для task2a и task2b.

  • Для task2a правило — all_success. Он ждёт успеха своего родителя (task1).
  • Для task2b правило — all_failed. Он ждёт провала своего родителя (task1).

А для task3 правило — all_done. Он ждёт, когда все его родители (task2a и task2b) завершатся, независимо от их статуса.

Вот как это выглядит в коде, используя SSHExecuteOperator, как в твоём случае:

from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook

sshHook = SSHHook(conn_id=<ТВОЙ CONNECTION ID ИЗ UI>)

task_1 = SSHExecuteOperator(
    task_id='task_1',
    bash_command=<ТВОЯ КОМАНДА>,
    ssh_hook=sshHook,
    dag=dag)

task_2a = SSHExecuteOperator(
    task_id='task_2a',
    bash_command=<ТВОЯ КОМАНДА>,
    ssh_hook=sshHook,
    trigger_rule=TriggerRule.ALL_SUCCESS,  # Ждём успеха task_1
    dag=dag)

task_2b = SSHExecuteOperator(
    task_id='task_2b',
    bash_command=<ТВОЯ КОМАНДА>,
    ssh_hook=sshHook,
    trigger_rule=TriggerRule.ALL_FAILED,  # Ждём провала task_1
    dag=dag)

task_3 = SSHExecuteOperator(
    task_id='task_3',
    bash_command=<ТВОЯ КОМАНДА>,
    ssh_hook=sshHook,
    trigger_rule=TriggerRule.ALL_DONE,  # Ждём завершения task_2a ИЛИ task_2b
    dag=dag)

# Собираем пайплайн
task_1 >> [task_2a, task_2b]
[task_2a, task_2b] >> task_3

Важные моменты:

  1. Задачи task2a и task2b должны быть зависимы от task1. Мы задаём эту зависимость строкой task_1 >> [task_2a, task_2b].
  2. Обе «условные» задачи (2a и 2b) будут находиться в состоянии skipped (пропущены), если их триггерное правило не выполнилось. Это нормально. В логе scheduler будет Task Skipped - not required.
  3. Правило all_done для task3 гарантирует, что он запустится после того, как та из родительских задач, которая должна была выполниться, завершится.

Таким образом, BranchOperator и XCom здесь не нужны. Вся логика зашита в механизм зависимостей и триггерных правил. Это канонический и самый чистый способ реализовать простое ветвление «if-else» в Airflow.

Итог: смотри на trigger_rule. Правила all_success, all_failed, one_success и all_done покрывают большинство сценариев условного выполнения. Просто явно укажи нужное правило, а Airflow сделает всю работу за тебя.