Apache Airflow: Writing to Airflow Logs
Ты написал таск, который должен что-то делать, и хочешь понять, что же он там делает прямо сейчас. Возвращаешь строку из PythonOperator — она появляется в логах
Ты написал таск, который должен что-то делать, и хочешь понять, что же он там делает прямо сейчас. Возвращаешь строку из PythonOperator — она появляется в логах. Работает. Но выглядит как костыль. И хочется больше контроля: дебажные сообщения, информационные, предупреждения.
И тут обнаруживается, что print в продакшене — это как кричать в космосе. Никто не услышит. Airflow не перенаправляет стандартный вывод в свои логи по умолчанию. Нужен правильный канал.
Каноничный способ: модуль logging
Всё уже придумано. Используй встроенный модуль Python logging. Airflow его подхватывает и аккуратно складывает все сообщения в свои логи, привязывая к конкретному дагу и таску.
import logging
def my_task_function():
logging.info('Начинаю важную работу')
# ... какой-то код ...
logging.warning('Что-то пошло не так, но работаю дальше')
# ... ещё код ...
logging.error('Всё, приехали')
return 'Готово'
Это самый чистый и контролируемый метод. Уровни логирования позволяют фильтровать шум от важных событий. debug, info, warning, error, critical — выбирай по обстановке.
Почему это работает
Airflow использует конфигурацию логирования Python. Когда твой код выполняется в среде Airflow, логгер наследует настройки корневого логгера Airflow. Все сообщения идут в тот же поток, что и системные логи планировщика или воркера, и попадают в знакомые тебе файлы или в бэкенд (например, Cloud Logging).
Это даёт тебе полную интеграцию: в UI, в метадате таска, в централизованной системе сбора логов — везде будут твои структурированные сообщения.
А если хочется из контекста?
Иногда нужно логировать, имея под рукой context. Например, добавить run_id или task_instance_key_str. Тут тоже поможет logging, но можно получить логгер для конкретного таска.
def my_task_function(**context):
# Получаем логгер, привязанный к текущему модулю (или дагу)
logger = logging.getLogger(__name__)
# Используем его
logger.info(f"Запуск с run_id: {context['run_id']}")
# Или даже так, если хочешь явно указать имя
task_logger = logging.getLogger('airflow.task')
task_logger.error("Сообщение через airflow.task")
Разницы в выводе, по сути, нет. Оба способа направят сообщение в нужное русло. Выбор зависит от стиля и желания явно указать источник.
Итог
Забудь про print в продакшн-дагах. Это ненадёжно. Импортируй logging и используй его методы для любых сообщений: от отладочных до критических.
Возврат строки из PythonOperator — это быстрый хак для простых случаев, но для серьёзной работы нужен серьёзный инструмент. Логирование через модуль — это и есть тот инструмент. Он структурирован, уровневый и полностью интегрирован в экосистему Airflow.
Пиши logging.info('Задача стартовала') и спи спокойно. Твои сообщения не потеряются.