Apache Airflow: execution_date in airflow: need to access as a variable
Ты написал даг, где каждая таска дергает REST API через curl. Всё должно работать, но в запросе нужна не текущая дата, а та, за которую этот даг фактически запу
Ты написал даг, где каждая таска дергает REST API через curl. Всё должно работать, но в запросе нужна не текущая дата, а та, за которую этот даг фактически запущен — execution_date. А подставляется datetime.now(). И теперь твой бэкенд получает не те данные, которые ожидал.
Знакомо. Все через это проходили.
execution_date — это не просто переменная
Главная путаница возникает из-за того, как Airflow выполняет твой код. Твой файл с дагом парсится один раз — при загрузке DAG в scheduler. Строка current_datetime = datetime_obj.now(tz=tz.tzlocal()) выполнится в этот момент и навсегда зафиксируется.
Но каждый запуск дага (DAG Run) имеет свой собственный execution_date. Это ключевая концепция. И чтобы подставить именно его в команду, нужно использовать шаблонизацию.
BashOperator.bash_command — это Jinja2 шаблон
Аргумент bash_command в BashOperator — не простая строка. Это Jinja2 шаблон, который Airflow рендерит отдельно для каждого запуска задачи, подставляя в него контекстные переменные.
execution_date — одна из таких переменных. И в шаблоне она доступна как объект datetime.datetime. С ним можно делать всё, что позволяет Python.
Вот как это выглядит на практике:
# В шаблоне можно вызывать методы объекта execution_date
# Например, получить первый день месяца того execution_date, за который запущен даг
command = "some_script.sh {{ execution_date.replace(day=1) }}"
# Или получить последний день предыдущего месяца, используя макросы Airflow
command = "some_script.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}"
Если же тебе нужна просто строка с датой, Airflow предоставляет удобные макросы. {{ ds }} — это дата в формате YYYY-MM-DD. {{ ds_nodash }} — тот же формат, но без дефисов: YYYYMMDD. Полный список есть в документации.
Как исправить твой даг
Нужно сделать две вещи: убрать вычисление current_datetime из глобального кода дага и переписать команду, используя шаблон.
Вот рабочий вариант, собранный из кусков решения:
# Формируем строку команды, где часть параметра - шаблон
command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator(
task_id='rest-api-1',
bash_command=command, # Airflow отрендерит {{ ds }} для каждого запуска
dag=dag)
Обрати внимание на % locals() — это старый, но работающий способ подставить переменную hostname в строку до того, как она станет шаблоном для Airflow. Сам шаблон {{ ds }} останется нетронутым и будет заменен уже при выполнении задачи.
Итог
Порядок мыслей: если значение должно быть уникальным для каждого запуска дага — оно должно быть в шаблоне оператора. execution_date, ds, next_ds и другие макросы существуют именно для этого. Используй их, и твои curl-запросы всегда будут попадать в нужную временную точку.