Apache Airflow: How to create a conditional task in Airflow
Ты проектируешь даг и тебе нужно ветвление. Не просто «запусти таски A, B, C», а «если A прошло успешно — сделай B, если упало — сделай C, а потом в любом случа
Ты проектируешь даг и тебе нужно ветвление. Не просто «запусти таски A, B, C», а «если A прошло успешно — сделай B, если упало — сделай C, а потом в любом случае запусти D». И кажется, что нужно городить сложную логику с XCom и BranchOperator.
На самом деле, всё проще. В Airflow для этого есть триггерные правила (trigger rules). Каждый оператор имеет параметр trigger_rule, который определяет, когда ему можно стартовать.
Как это работает по умолчанию?
Дефолтное правило — all_success. Это значит «запустись, только если все прямые родители задачи выполнились успешно». Именно поэтому в простой цепочке task1 >> task2 второй таск ждёт успешного завершения первого.
А что нам нужно?
Нам нужно создать условие. После task1 должен выполниться либо task2a (если успех), либо task2b (если провал). А task3 должен запуститься в любом случае, когда task2a ИЛИ task2b завершатся.
Ключ — разные триггерные правила для task2a и task2b.
- Для
task2aправило —all_success. Он ждёт успеха своего родителя (task1). - Для
task2bправило —all_failed. Он ждёт провала своего родителя (task1).
А для task3 правило — all_done. Он ждёт, когда все его родители (task2a и task2b) завершатся, независимо от их статуса.
Вот как это выглядит в коде, используя SSHExecuteOperator, как в твоём случае:
from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook
sshHook = SSHHook(conn_id=<ТВОЙ CONNECTION ID ИЗ UI>)
task_1 = SSHExecuteOperator(
task_id='task_1',
bash_command=<ТВОЯ КОМАНДА>,
ssh_hook=sshHook,
dag=dag)
task_2a = SSHExecuteOperator(
task_id='task_2a',
bash_command=<ТВОЯ КОМАНДА>,
ssh_hook=sshHook,
trigger_rule=TriggerRule.ALL_SUCCESS, # Ждём успеха task_1
dag=dag)
task_2b = SSHExecuteOperator(
task_id='task_2b',
bash_command=<ТВОЯ КОМАНДА>,
ssh_hook=sshHook,
trigger_rule=TriggerRule.ALL_FAILED, # Ждём провала task_1
dag=dag)
task_3 = SSHExecuteOperator(
task_id='task_3',
bash_command=<ТВОЯ КОМАНДА>,
ssh_hook=sshHook,
trigger_rule=TriggerRule.ALL_DONE, # Ждём завершения task_2a ИЛИ task_2b
dag=dag)
# Собираем пайплайн
task_1 >> [task_2a, task_2b]
[task_2a, task_2b] >> task_3
Важные моменты:
- Задачи
task2aиtask2bдолжны быть зависимы отtask1. Мы задаём эту зависимость строкойtask_1 >> [task_2a, task_2b]. - Обе «условные» задачи (
2aи2b) будут находиться в состоянииskipped(пропущены), если их триггерное правило не выполнилось. Это нормально. В логе scheduler будетTask Skipped - not required. - Правило
all_doneдляtask3гарантирует, что он запустится после того, как та из родительских задач, которая должна была выполниться, завершится.
Таким образом, BranchOperator и XCom здесь не нужны. Вся логика зашита в механизм зависимостей и триггерных правил. Это канонический и самый чистый способ реализовать простое ветвление «if-else» в Airflow.
Итог: смотри на trigger_rule. Правила all_success, all_failed, one_success и all_done покрывают большинство сценариев условного выполнения. Просто явно укажи нужное правило, а Airflow сделает всю работу за тебя.