Great Expectations: Great_Expectations Conditional Expectation in Spark 3.2.1 with Pandas API in DataBricks
Ты ~~настроил~~ Great Expectations в DataBricks, хочешь проверить данные по условию. Открываешь документацию, копируешь пример с `condition` — а он не работает.
Ты настроил Great Expectations в DataBricks, хочешь проверить данные по условию. Открываешь документацию, копируешь пример с condition — а он не работает. Среда-то у тебя Spark 3.2.1, а в доке чётко сказано: только Pandas.
И ты такой стоишь перед выбором: или переписать всё под Pandas, или искать костыль. Квазары тут ни при чём, просто архитектура.
Почему Spark не дружит с условными ожиданиями
Всё просто. Great Expectations — это про валидацию датафреймов. Условные ожидания (conditional_expectations) — это когда ты применяешь проверку не ко всем строкам, а только к тем, которые подходят под условие.
В Pandas это работает, потому что там всё происходит в памяти одного процесса. А Spark — распределённая штука. Его движок вычислений не умеет выполнять произвольный Python-код условия в контексте каждой строки так, как это делает Pandas. Точнее, умеет через Pandas UDF, но GE пока не интегрировал эту возможность.
Документация не врёт. Параметр condition в большинстве ожиданий имеет жёсткую привязку:
# Так работает только для execution_engine="pandas"
expect_column_values_to_be_between(
column="price",
min_value=0,
max_value=100,
condition=col("type") == "discounted" # Вот эта штука
)
Для Spark 3.2.1 с Pandas API on Spark (pyspark.pandas) это тоже не сработает. Под капотом GE видит Spark-датафрейм и честно пытается применить ожидание ко всем данным, но натыкается на несовместимость.
Что делать, если очень надо
Вариант первый, грубый и простой. Перевести данные в Pandas на драйвере. Подходит только для данных, которые влезают в память одной машины.
# В Databricks это может быть дорого, но работает
pandas_df = spark_df.toPandas()
# Дальше запускаешь GE с execution_engine="pandas"
Вариант второй, более Spark-way. Разбить одну проверку на две. Фильтруешь данные сам, а потом применяешь обычное (не условное) ожидание к каждой части.
# Вместо одного conditional expectation делаешь два датафрейма
discounted_df = spark_df.filter(col("type") == "discounted")
regular_df = spark_df.filter(col("type") != "discounted")
# И к каждому применяешь свою проверку
validator.expect_column_values_to_be_between(
column="price", min_value=0, max_value=50,
meta={"dataset": "discounted"}
)
validator.expect_column_values_to_be_between(
column="price", min_value=10, max_value=100,
meta={"dataset": "regular"}
)
Минус — дублирование кода. Плюс — всё работает на нативных Spark-трансформациях, масштабируется и понятно.
Вариант третий. Не использовать условные ожидания из коробки GE. Написать кастомное ожидание, которое внутри будет использовать pyspark.sql.functions.when(). Это чисто Spark-подход, но нужно кодить.
Итог
Условные ожидания в Great Expectations — это пока прерогатива Pandas. В Spark 3.2.1, даже с Pandas API, прямой путь — фильтровать датафрейм вручную перед проверкой.
Если данные небольшие, можно упасть на драйвер в Pandas. Если большие — разбивай на логические группы и валидируй каждую отдельно. И следи за релизами GE — может, когда-нибудь и добавят нативную поддержку Spark для condition.