Apache Airflow: Airflow - How to pass xcom variable into Python function
Ты передал в `params` шаблон Jinja и ждёшь, что Airflow подставит туда значение XCom. А он вместо этого честно передаёт твою строку `'{{ ti.xcom_pull(...) }}'`
Ты передал в params шаблон Jinja и ждёшь, что Airflow подставит туда значение XCom. А он вместо этого честно передаёт твою строку "{{ ti.xcom_pull(...) }}" прямиком в функцию. И она печатает эту самую строку, а не hello world.
Ты не первый, кто на это подписался. PythonOperator — не BashOperator. У каждого оператора есть список полей, которые умеют рендерить шаблоны. Это template_fields.
У BashOperator в template_fields входит bash_command, поэтому {{ ... }} там работает из коробки. А у PythonOperator в template_fields по умолчанию только templates_dict. Поле params туда не входит.
Вот и получается, что твой шаблон в params летит в функцию как есть, красивая строчка с фигурными скобками.
Как надо: op_kwargs и op_args
Поля op_kwargs и op_args у PythonOperator как раз поддерживают рендеринг шаблонов. Передавай аргументы своей функции через них.
def func_archive_s3_file(s3_path_filename):
print(f"Архивирую: {s3_path_filename}") # Напечатает "Архивирую: hello world"
task_archive_s3_file = PythonOperator(
task_id='archive_s3_file',
python_callable=obj.func_archive_s3_file,
op_kwargs={'s3_path_filename': "{{ ti.xcom_pull(task_ids='submit_file_to_spark') }}"},
dag=dag
)
Всё. Шаблон в op_kwargs отрендерится, и в функцию придёт уже готовое значение из XCom. Для позиционных аргументов используй op_args.
Альтернатива: контекст
Часто более идиоматичный способ — не тащить XCom через шаблоны, а достать его прямо из контекста внутри функции. Контекст автоматически передаётся, если в функции объявить **context или соответствующий параметр.
def func_archive_s3_file(**context):
s3_path_filename = context['ti'].xcom_pull(task_ids='submit_file_to_spark')
print(f"Архивирую: {s3_path_filename}")
task_archive_s3_file = PythonOperator(
task_id='archive_s3_file',
python_callable=obj.func_archive_s3_file,
dag=dag
)
Так код чище. Не нужно городить шаблоны в определении DAG, вся логика получения зависимых значений инкапсулирована внутри вызываемой функции.
Корень зла
Если интересно, почему params не работает, а op_kwargs работает — смотри в исходники. В PythonOperator метод prepare_template рендерит именно op_kwargs и op_args.
# Упрощённо
def prepare_template(self):
self.op_kwargs = self.render_template(self.op_kwargs, self.context)
А params такому рендерингу не подвергается. Он задуман для статичной конфигурации, а не для передачи результатов выполнения других тасок.
Итог
Никакого волшебства. Хочешь передать XCom в Python-функцию — используй op_kwargs/op_args для рендеринга шаблонов. Или просто бери значение из контекста внутри функции.
Первый способ — явный, декларативный. Второй — гибкий, не привязан к шаблонам. Выбор зависит от того, хочешь ли ты видеть зависимости между тасками прямо в определении DAG или предпочитаешь управлять ими внутри кода.