Errors
Apache Airflow

Apache Airflow: Airflow - How to pass xcom variable into Python function

Ты передал в `params` шаблон Jinja и ждёшь, что Airflow подставит туда значение XCom. А он вместо этого честно передаёт твою строку `'{{ ti.xcom_pull(...) }}'`

Ты передал в params шаблон Jinja и ждёшь, что Airflow подставит туда значение XCom. А он вместо этого честно передаёт твою строку "{{ ti.xcom_pull(...) }}" прямиком в функцию. И она печатает эту самую строку, а не hello world.

Ты не первый, кто на это подписался. PythonOperator — не BashOperator. У каждого оператора есть список полей, которые умеют рендерить шаблоны. Это template_fields.

У BashOperator в template_fields входит bash_command, поэтому {{ ... }} там работает из коробки. А у PythonOperator в template_fields по умолчанию только templates_dict. Поле params туда не входит.

Вот и получается, что твой шаблон в params летит в функцию как есть, красивая строчка с фигурными скобками.

Как надо: op_kwargs и op_args

Поля op_kwargs и op_args у PythonOperator как раз поддерживают рендеринг шаблонов. Передавай аргументы своей функции через них.

def func_archive_s3_file(s3_path_filename):
    print(f"Архивирую: {s3_path_filename}")  # Напечатает "Архивирую: hello world"

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
    python_callable=obj.func_archive_s3_file,
    op_kwargs={'s3_path_filename': "{{ ti.xcom_pull(task_ids='submit_file_to_spark') }}"},
    dag=dag
)

Всё. Шаблон в op_kwargs отрендерится, и в функцию придёт уже готовое значение из XCom. Для позиционных аргументов используй op_args.

Альтернатива: контекст

Часто более идиоматичный способ — не тащить XCom через шаблоны, а достать его прямо из контекста внутри функции. Контекст автоматически передаётся, если в функции объявить **context или соответствующий параметр.

def func_archive_s3_file(**context):
    s3_path_filename = context['ti'].xcom_pull(task_ids='submit_file_to_spark')
    print(f"Архивирую: {s3_path_filename}")

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
    python_callable=obj.func_archive_s3_file,
    dag=dag
)

Так код чище. Не нужно городить шаблоны в определении DAG, вся логика получения зависимых значений инкапсулирована внутри вызываемой функции.

Корень зла

Если интересно, почему params не работает, а op_kwargs работает — смотри в исходники. В PythonOperator метод prepare_template рендерит именно op_kwargs и op_args.

# Упрощённо
def prepare_template(self):
    self.op_kwargs = self.render_template(self.op_kwargs, self.context)

А params такому рендерингу не подвергается. Он задуман для статичной конфигурации, а не для передачи результатов выполнения других тасок.

Итог

Никакого волшебства. Хочешь передать XCom в Python-функцию — используй op_kwargs/op_args для рендеринга шаблонов. Или просто бери значение из контекста внутри функции.

Первый способ — явный, декларативный. Второй — гибкий, не привязан к шаблонам. Выбор зависит от того, хочешь ли ты видеть зависимости между тасками прямо в определении DAG или предпочитаешь управлять ими внутри кода.