Apache Airflow: Proper way to create dynamic workflows in Airflow
Ты спроектировал идеальный пайплайн: Task A производит N результатов, и для каждого нужно запустить долгий и тяжёлый Task B, а потом собрать всё в Task C. Загру
Ты спроектировал идеальный пайплайн: Task A производит N результатов, и для каждого нужно запустить долгий и тяжёлый Task B, а потом собрать всё в Task C. Загружаешь DAG в Airflow, а он смотрит на тебя пустым интерфейсом — как динамические задачи создавать, он не знает.
Классический даг в Airflow — это статическая схема. Он компилируется один раз, при запуске scheduler, и все задачи должны быть известны в этот момент. Но мир данных не такой.
Идея №1: Костыль через отдельный даг
Первое, что приходит в голову — разбить на два дага. В данных описан именно этот метод.
Запускаешь Dag 1, где Task A через TriggerDagRunOperator создаёт и триггерит Dag 2. В Dag 2 динамически, внутри python_callable оператора-триггера, генерируется веер из Task B.1…B.N. А потом Dag 1 ждёт завершения Dag 2 через ExternalTaskSensor и выполняет Task C.
Это работает. Но это сложно, нужно следить за двумя дагами, а датчик (ExternalTaskSensor) будет постоянно опрашивать метабазу, создавая лишнюю нагрузку. И да, это выглядит как костыль, потому что это и есть костыль.
Решение: Динамический Task Mapping (Airflow 2.3+)
Начиная с Airflow 2.3, появился механизм, который решает эту проблему изящно — Dynamic Task Mapping.
Смысл в том, что Task A возвращает список или словарь с параметрами для выполнения. А следующая задача, объявленная как динамическая (expand), создаст по одной своей копии (маппированной задаче) для каждого элемента этого списка.
Вот как это выглядит в коде:
@task
def task_a():
# Выполняем логику, которая определяет, сколько нужно B-задач
# Например, получаем список файлов, ID шардов, названия таблиц
list_of_items = calculate_items()
return list_of_items
@task
def task_b(item):
# Долгая и тяжёлая обработка для каждого элемента
process_for_hours(item)
@task
def task_c():
# Агрегируем результаты после всех B
aggregate_results()
# Сама DAG-структура
list_from_a = task_a()
# Ключевой момент: expand создаст задачи динамически
task_b.expand(item=list_from_a) >> task_c()
После выполнения task_a Airflow увидит возвращённый список и автоматически развернёт N параллельных экземпляров task_b. Все они будут видны в UI как группа, а task_c запустится только после завершения всей этой динамически созданной пачки.
Что выбрать?
Костыль с двумя дагами имеет право на жизнь в старых версиях Airflow (<2.3) или в очень специфичных сценариях изоляции. Но это ад для поддержки.
Dynamic Task Mapping — это каноничный и встроенный способ Airflow решать задачи с неизвестным на стадии компиляции графа параллелизмом. Он проще, нативнее и вся логика остаётся в рамках одного DAG.
Итог: обнови Airflow до 2.3+ и используй expand. Это то, для чего он был создан.