Errors
Apache Airflow

Apache Airflow: Airflow parallelism

Ты настроил параллелизм в airflow.cfg, запустил тяжёлый даг, а он ползёт как улитка. Или наоборот — отжал все ресурсы сервера. Знакомо? Параметров много, назван

Ты настроил параллелизм в airflow.cfg, запустил тяжёлый даг, а он ползёт как улитка. Или наоборот — отжал все ресурсы сервера. Знакомо? Параметров много, названия запутанные, а в документации сухо. Давай разжёвывать.

Что на что влияет

Главный источник путаницы — имена. Они исторические и неочевидные. Откроем airflow.cfg и пройдёмся по полям.

parallelism — это не про процессы или треды. Это жёсткий лимит одновременно активных тасок во всём Airflow. Один инстанс — одна база данных состояния. Если у вас два воркера, они делят этот общий лимит. Лучше представлять его как max_active_tasks_across_airflow.

dag_concurrency — а вот это уже на уровне воркера. Сколько тасок один воркер может взять в работу одновременно. Имя обманчивое — к дагам оно имеет косвенное отношение. По сути max_active_tasks_per_worker.

max_active_runs_per_dag — глобальный дефолт для одноимённого параметра дага. Ограничивает количество параллельных запусков одного дага.

Параметры самого DAG

В коде дага свои настройки, которые бьют поверх глобальных.

concurrency — максимальное количество одновременно работающих тасок внутри одного запуска этого дага. Не путать с dag_concurrency. Логичнее было бы назвать max_active_tasks_inside_dag_run.

max_active_runs — то же самое, что max_active_runs_per_dag, но для конкретного дага. Имя нормальное.

А где же процессы?

А процессы создаёт Executor. LocalExecutor порождает под каждую задачу отдельный процесс, но сколько он может создать — упирается в описанные выше лимиты.

Параметр max_threads в настройках scheduler — это про другое. Он определяет, сколько потоков шедулер будет использовать для самой работы планировщика, а не для исполнения задач. Например, для анализа дагов и постановки задач в очередь.

Итог

Порядок настройки под нагрузку:

  1. parallelism — выставить общий потолок по задачам для всей системы.
  2. dag_concurrency — определить, сколько задач может тянуть один воркер (для LocalExecutor — процессы).
  3. max_active_runs_per_dag — задать дефолт для параллельных запусков дагов.
  4. В коде каждого дага точечно настроить concurrency и max_active_runs, если нужно.

Меняешь parallelism, но даг всё равно не параллелится? Скорее всего, упёрся в concurrency на уровне дага или в dag_concurrency воркера. Смотри по цепочке снизу вверх.