Apache Airflow: How to control the parallelism or concurrency of an Airflow installation?
Твои даги висят в состоянии running, а задачи в очереди, хотя воркеры вроде и не загружены. Или наоборот — Airflow хавает все ресурсы кластера, и ты хочешь его
Твои даги висят в состоянии running, а задачи в очереди, хотя воркеры вроде и не загружены. Или наоборот — Airflow хавает все ресурсы кластера, и ты хочешь его притормозить. Знакомая история.
Параллелизм в Airflow — это не одна кнопка, а набор “рычагов”. Настраивать их нужно аккуратно, иначе упрешься в неочевидные лимиты.
Рычаг первый — на уровне DAG
Сам даг может иметь свои лимиты. Их два.
concurrency — это сколько тасок всего могут одновременно работать во всех активных раннах этого DAG. Если поставил concurrency=5, то шестая задача будет ждать, даже если раннов десять.
max_active_runs — сколько одновременно активных запусков одного DAG может быть. Новый запуск не создастся, пока не завершится один из предыдущих.
Выставляются прямо в коде:
# Два параллельных ранна, но всего 5 тасок между ними
dag = DAG('example', concurrency=5, max_active_runs=2)
Если не задать, Airflow возьмет значения по умолчанию из core.dag_concurrency и core.max_active_runs_per_dag в airflow.cfg.
Рычаг второй — на уровне таски
Здесь тоже два основных инструмента.
pool — самый мощный. Создаешь пул с именем и лимитом слотов (через UI или API). Назначаешь таске этот пул. Теперь параллельность ограничена только для тасок в этом пуле, остальные задачи его не видят. Идеально для ограничения доступа к редкому ресурсу вроде внешнего API или мощной БД.
# Эта задача будет ждать свободного слота в пуле 'api_pool'
my_task = PythonOperator(
task_id='heavy_api_call',
pool='api_pool',
dag=dag
)
max_active_tis_per_dag — менее известный, но полезный. Ограничивает количество одновременно работающих одинаковых тасок в разных запусках одного DAG. Помогает, когда одна и та же задача в параллельных раннах не должна бегать в бесконечных копиях.
Общая картина
Не забудь про глобальные лимиты. Параметр core.parallelism — это абсолютный максимум одновременно выполняемых тасок во всем Airflow. А core.dag_concurrency — дефолтный лимит для дага, если свой не указан.
Как настраивать? Сверху вниз.
- Сначала
core.parallelism— общий потолок для установки. - Потом
core.dag_concurrencyиcore.max_active_runs_per_dag— дефолты для всех дагов. - Далее точечно настраиваешь конкретные DAG через их параметры.
- И наконец, создаешь
poolsдля самых “жадных” операторов.
Если нужно снизить нагрузку — уменьшай лимиты на уровне дагов или создавай пулы с малым числом слотов для ресурсоемких задач. Хочешь больше параллелизма — поднимай core.parallelism и лимиты конкретных дагов.
Главное правило: меняй одно значение за раз и смотри, что происходит. Иначе можно долго гадать, какой именно лимит тебя сейчас держит.