Apache Airflow: How do I force a task on airflow to fail?
Ты написал оператор, который должен падать, если CSV не обработан до конца. Но он упорно светится зелёным, хотя по логике должен гореть красным. Знакомо? Возмож
Ты написал оператор, который должен падать, если CSV не обработан до конца. Но он упорно светится зелёным, хотя по логике должен гореть красным. Знакомо? Возможно, ты уже проверил логи, dependencies и даже перезапустил scheduler, но задача всё равно завершается success.
Дело в том, что PythonOperator по умолчанию интерпретирует успешное выполнение функции как успех задачи. Он не смотрит на возвращаемое значение (если это не True/False для некоторых триггеров). Возврат False или None — это не исключение, это просто результат функции. Задача отработала, код выполнился, Airflow доволен.
Вот такой код никого не волнует:
def process_csv_entries(csv_file):
file_completely_parsed = <call_to_module_to_parse_csv>
return not file_completely_parsed # Вернёт True или False
Оператор выполнит функцию, получит True или False и спокойно завершится со статусом success. Ему всё равно. Ему важно только одно: не упала ли функция с исключением.
Поэтому правило простое: хочешь уронить задачу — роняй её явно. Вручную. Без жалости.
Вместо того чтобы возвращать флаг, нужно бросить исключение при нарушении условия. Как только интерпретатор Python видит raise, выполнение обрывается, и Airflow фиксирует состояние failed.
Исправленный код выглядит так:
def process_csv_entries(csv_file):
file_completely_parsed = <call_to_module_to_parse_csv>
if not file_completely_parsed:
raise ValueError('File not parsed completely/correctly')
# Если всё хорошо — функция завершается без return, задача успешна
Теперь логика на твоей стороне. Если парсинг не удался — задача немедленно падает с понятной ошибкой в логе. В UI появится красный крестик, а в уведомлениях — сообщение ValueError.
Можно использовать и другие типы исключений, более специфичные: RuntimeError, IOError, или кастомное. Главное — чтобы оно было необработанным внутри функции. Это сигнал для Airflow, что что-то пошло не так.
Почему это работает? Потому что Airflow оборачивает выполнение твоей функции в try-except. Любое непойманное исключение перехватывается фреймворком и переводит состояние задачи в failed. Возвращаемое значение при этом не анализируется.
Так что запомни: PythonOperator заставляет задачу упасть только одним способом — через исключение. Никакие хитрые return False, sys.exit(1) или изменение переменных контекста не сработают. Только raise.
Дальше можно настраивать логику: несколько проверок в функции, разные исключения с понятными сообщениями, повторные попытки через retries. Но фундамент один — если бизнес-логика не выполнена, поднимай ошибку. Airflow сделает всё остальное.
Итог: не возвращай флаг, бросай исключение. Одна строчка raise сэкономит часы отладки и сделает твой даг предсказуемым.