Errors
Apache Airflow

Apache Airflow: Proper way to create dynamic workflows in Airflow

Ты спроектировал идеальный пайплайн: Task A производит N результатов, и для каждого нужно запустить долгий и тяжёлый Task B, а потом собрать всё в Task C. Загру

Ты спроектировал идеальный пайплайн: Task A производит N результатов, и для каждого нужно запустить долгий и тяжёлый Task B, а потом собрать всё в Task C. Загружаешь DAG в Airflow, а он смотрит на тебя пустым интерфейсом — как динамические задачи создавать, он не знает.

Классический даг в Airflow — это статическая схема. Он компилируется один раз, при запуске scheduler, и все задачи должны быть известны в этот момент. Но мир данных не такой.

Идея №1: Костыль через отдельный даг

Первое, что приходит в голову — разбить на два дага. В данных описан именно этот метод.

Запускаешь Dag 1, где Task A через TriggerDagRunOperator создаёт и триггерит Dag 2. В Dag 2 динамически, внутри python_callable оператора-триггера, генерируется веер из Task B.1…B.N. А потом Dag 1 ждёт завершения Dag 2 через ExternalTaskSensor и выполняет Task C.

Это работает. Но это сложно, нужно следить за двумя дагами, а датчик (ExternalTaskSensor) будет постоянно опрашивать метабазу, создавая лишнюю нагрузку. И да, это выглядит как костыль, потому что это и есть костыль.

Решение: Динамический Task Mapping (Airflow 2.3+)

Начиная с Airflow 2.3, появился механизм, который решает эту проблему изящно — Dynamic Task Mapping.

Смысл в том, что Task A возвращает список или словарь с параметрами для выполнения. А следующая задача, объявленная как динамическая (expand), создаст по одной своей копии (маппированной задаче) для каждого элемента этого списка.

Вот как это выглядит в коде:

@task
def task_a():
    # Выполняем логику, которая определяет, сколько нужно B-задач
    # Например, получаем список файлов, ID шардов, названия таблиц
    list_of_items = calculate_items()
    return list_of_items

@task
def task_b(item):
    # Долгая и тяжёлая обработка для каждого элемента
    process_for_hours(item)

@task
def task_c():
    # Агрегируем результаты после всех B
    aggregate_results()

# Сама DAG-структура
list_from_a = task_a()
# Ключевой момент: expand создаст задачи динамически
task_b.expand(item=list_from_a) >> task_c()

После выполнения task_a Airflow увидит возвращённый список и автоматически развернёт N параллельных экземпляров task_b. Все они будут видны в UI как группа, а task_c запустится только после завершения всей этой динамически созданной пачки.

Что выбрать?

Костыль с двумя дагами имеет право на жизнь в старых версиях Airflow (<2.3) или в очень специфичных сценариях изоляции. Но это ад для поддержки.

Dynamic Task Mapping — это каноничный и встроенный способ Airflow решать задачи с неизвестным на стадии компиляции графа параллелизмом. Он проще, нативнее и вся логика остаётся в рамках одного DAG.

Итог: обнови Airflow до 2.3+ и используй expand. Это то, для чего он был создан.