Errors
Apache Airflow

Apache Airflow: Writing to Airflow Logs

Ты написал таск, который должен что-то делать, и хочешь понять, что же он там делает прямо сейчас. Возвращаешь строку из PythonOperator — она появляется в логах

Ты написал таск, который должен что-то делать, и хочешь понять, что же он там делает прямо сейчас. Возвращаешь строку из PythonOperator — она появляется в логах. Работает. Но выглядит как костыль. И хочется больше контроля: дебажные сообщения, информационные, предупреждения.

И тут обнаруживается, что print в продакшене — это как кричать в космосе. Никто не услышит. Airflow не перенаправляет стандартный вывод в свои логи по умолчанию. Нужен правильный канал.

Каноничный способ: модуль logging

Всё уже придумано. Используй встроенный модуль Python logging. Airflow его подхватывает и аккуратно складывает все сообщения в свои логи, привязывая к конкретному дагу и таску.

import logging

def my_task_function():
    logging.info('Начинаю важную работу')
    # ... какой-то код ...
    logging.warning('Что-то пошло не так, но работаю дальше')
    # ... ещё код ...
    logging.error('Всё, приехали')
    return 'Готово'

Это самый чистый и контролируемый метод. Уровни логирования позволяют фильтровать шум от важных событий. debug, info, warning, error, critical — выбирай по обстановке.

Почему это работает

Airflow использует конфигурацию логирования Python. Когда твой код выполняется в среде Airflow, логгер наследует настройки корневого логгера Airflow. Все сообщения идут в тот же поток, что и системные логи планировщика или воркера, и попадают в знакомые тебе файлы или в бэкенд (например, Cloud Logging).

Это даёт тебе полную интеграцию: в UI, в метадате таска, в централизованной системе сбора логов — везде будут твои структурированные сообщения.

А если хочется из контекста?

Иногда нужно логировать, имея под рукой context. Например, добавить run_id или task_instance_key_str. Тут тоже поможет logging, но можно получить логгер для конкретного таска.

def my_task_function(**context):
    # Получаем логгер, привязанный к текущему модулю (или дагу)
    logger = logging.getLogger(__name__)
    # Используем его
    logger.info(f"Запуск с run_id: {context['run_id']}")
    # Или даже так, если хочешь явно указать имя
    task_logger = logging.getLogger('airflow.task')
    task_logger.error("Сообщение через airflow.task")

Разницы в выводе, по сути, нет. Оба способа направят сообщение в нужное русло. Выбор зависит от стиля и желания явно указать источник.

Итог

Забудь про print в продакшн-дагах. Это ненадёжно. Импортируй logging и используй его методы для любых сообщений: от отладочных до критических.

Возврат строки из PythonOperator — это быстрый хак для простых случаев, но для серьёзной работы нужен серьёзный инструмент. Логирование через модуль — это и есть тот инструмент. Он структурирован, уровневый и полностью интегрирован в экосистему Airflow.

Пиши logging.info('Задача стартовала') и спи спокойно. Твои сообщения не потеряются.