Errors
Apache Airflow

Apache Airflow: How to control the parallelism or concurrency of an Airflow installation?

Твои даги висят в состоянии running, а задачи в очереди, хотя воркеры вроде и не загружены. Или наоборот — Airflow хавает все ресурсы кластера, и ты хочешь его

Твои даги висят в состоянии running, а задачи в очереди, хотя воркеры вроде и не загружены. Или наоборот — Airflow хавает все ресурсы кластера, и ты хочешь его притормозить. Знакомая история.

Параллелизм в Airflow — это не одна кнопка, а набор “рычагов”. Настраивать их нужно аккуратно, иначе упрешься в неочевидные лимиты.

Рычаг первый — на уровне DAG

Сам даг может иметь свои лимиты. Их два.

concurrency — это сколько тасок всего могут одновременно работать во всех активных раннах этого DAG. Если поставил concurrency=5, то шестая задача будет ждать, даже если раннов десять.

max_active_runs — сколько одновременно активных запусков одного DAG может быть. Новый запуск не создастся, пока не завершится один из предыдущих.

Выставляются прямо в коде:

# Два параллельных ранна, но всего 5 тасок между ними
dag = DAG('example', concurrency=5, max_active_runs=2)

Если не задать, Airflow возьмет значения по умолчанию из core.dag_concurrency и core.max_active_runs_per_dag в airflow.cfg.

Рычаг второй — на уровне таски

Здесь тоже два основных инструмента.

pool — самый мощный. Создаешь пул с именем и лимитом слотов (через UI или API). Назначаешь таске этот пул. Теперь параллельность ограничена только для тасок в этом пуле, остальные задачи его не видят. Идеально для ограничения доступа к редкому ресурсу вроде внешнего API или мощной БД.

# Эта задача будет ждать свободного слота в пуле 'api_pool'
my_task = PythonOperator(
    task_id='heavy_api_call',
    pool='api_pool',
    dag=dag
)

max_active_tis_per_dag — менее известный, но полезный. Ограничивает количество одновременно работающих одинаковых тасок в разных запусках одного DAG. Помогает, когда одна и та же задача в параллельных раннах не должна бегать в бесконечных копиях.

Общая картина

Не забудь про глобальные лимиты. Параметр core.parallelism — это абсолютный максимум одновременно выполняемых тасок во всем Airflow. А core.dag_concurrency — дефолтный лимит для дага, если свой не указан.

Как настраивать? Сверху вниз.

  1. Сначала core.parallelism — общий потолок для установки.
  2. Потом core.dag_concurrency и core.max_active_runs_per_dag — дефолты для всех дагов.
  3. Далее точечно настраиваешь конкретные DAG через их параметры.
  4. И наконец, создаешь pools для самых “жадных” операторов.

Если нужно снизить нагрузку — уменьшай лимиты на уровне дагов или создавай пулы с малым числом слотов для ресурсоемких задач. Хочешь больше параллелизма — поднимай core.parallelism и лимиты конкретных дагов.

Главное правило: меняй одно значение за раз и смотри, что происходит. Иначе можно долго гадать, какой именно лимит тебя сейчас держит.