Apache Airflow: Airflow parallelism
Ты настроил параллелизм в airflow.cfg, запустил тяжёлый даг, а он ползёт как улитка. Или наоборот — отжал все ресурсы сервера. Знакомо? Параметров много, назван
Ты настроил параллелизм в airflow.cfg, запустил тяжёлый даг, а он ползёт как улитка. Или наоборот — отжал все ресурсы сервера. Знакомо? Параметров много, названия запутанные, а в документации сухо. Давай разжёвывать.
Что на что влияет
Главный источник путаницы — имена. Они исторические и неочевидные. Откроем airflow.cfg и пройдёмся по полям.
parallelism — это не про процессы или треды. Это жёсткий лимит одновременно активных тасок во всём Airflow. Один инстанс — одна база данных состояния. Если у вас два воркера, они делят этот общий лимит. Лучше представлять его как max_active_tasks_across_airflow.
dag_concurrency — а вот это уже на уровне воркера. Сколько тасок один воркер может взять в работу одновременно. Имя обманчивое — к дагам оно имеет косвенное отношение. По сути max_active_tasks_per_worker.
max_active_runs_per_dag — глобальный дефолт для одноимённого параметра дага. Ограничивает количество параллельных запусков одного дага.
Параметры самого DAG
В коде дага свои настройки, которые бьют поверх глобальных.
concurrency — максимальное количество одновременно работающих тасок внутри одного запуска этого дага. Не путать с dag_concurrency. Логичнее было бы назвать max_active_tasks_inside_dag_run.
max_active_runs — то же самое, что max_active_runs_per_dag, но для конкретного дага. Имя нормальное.
А где же процессы?
А процессы создаёт Executor. LocalExecutor порождает под каждую задачу отдельный процесс, но сколько он может создать — упирается в описанные выше лимиты.
Параметр max_threads в настройках scheduler — это про другое. Он определяет, сколько потоков шедулер будет использовать для самой работы планировщика, а не для исполнения задач. Например, для анализа дагов и постановки задач в очередь.
Итог
Порядок настройки под нагрузку:
- parallelism — выставить общий потолок по задачам для всей системы.
- dag_concurrency — определить, сколько задач может тянуть один воркер (для LocalExecutor — процессы).
- max_active_runs_per_dag — задать дефолт для параллельных запусков дагов.
- В коде каждого дага точечно настроить concurrency и max_active_runs, если нужно.
Меняешь parallelism, но даг всё равно не параллелится? Скорее всего, упёрся в concurrency на уровне дага или в dag_concurrency воркера. Смотри по цепочке снизу вверх.