PySpark Schema Testing: Never Ship a Broken Schema Again
PySpark schema bugs are the ones that get you at 3am. A nullable=False column that starts accepting nulls after a migration. A StringType that was supposed to be DoubleType. A schema in the code that quietly drifted from what's actually landing in the data lake.
These don't cause loud crashes. They cause silent data corruption that you find out about much later, usually when someone's looking at a dashboard and something doesn't add up.
The fix is schema tests. The problem is that almost nobody writes them — because writing them by hand is genuinely annoying. PySpark tests need a SparkSession. A SparkSession in CI needs either a real local Spark install or a mocked fixture that doesn't really match cluster behavior. It's 50+ lines of boilerplate per schema, and the boilerplate rarely gets written.
What schema tests actually need to check
Take this schema:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, BooleanType
payment_schema = StructType([
StructField("transaction_id", StringType(), nullable=False),
StructField("amount", DoubleType(), nullable=False),
StructField("currency", StringType(), nullable=False),
StructField("is_refund", BooleanType(), nullable=True),
])
Two categories of things to verify:
Nullability constraints — transaction_id, amount, and currency all say nullable=False. That's a promise that inserting a null should fail. If someone changes those to nullable=True during a migration, you want a test to catch it.
Type correctness — amount is DoubleType. If a pipeline change accidentally casts it to StringType, you want that caught before it reaches your data lake, not after someone notices the aggregations are broken.
Generating them automatically
Quell has a PySpark reader that parses StructType definitions directly from the AST — without importing pyspark at scan time. This matters because most CI environments don't have Spark installed just to run a scan.
pip install quelltest
quell check src/schemas.py --no-llm
payment_schema NOT_NULL transaction_id must not be null ✗ no test
payment_schema NOT_NULL amount must not be null ✗ no test
payment_schema NOT_NULL currency must not be null ✗ no test
payment_schema TYPE_CHECK payment_schema matches StructType ✗ no test
With --fix:
quell check src/schemas.py --fix --no-llm
payment_schema NOT_NULL transaction_id ✓ verified → written
payment_schema NOT_NULL amount ✓ verified → written
payment_schema NOT_NULL currency ✓ verified → written
payment_schema TYPE_CHECK payment_schema ✓ verified → written
4 tests written → tests/test_schemas.py
Your code never left your machine.
What gets written
The NOT_NULL tests try to insert a null value for each constrained column:
def test_payment_schema_not_null_transaction_id(spark: SparkSession):
with pytest.raises((AnalysisException, Exception)):
spark.createDataFrame(
[Row(transaction_id=None, amount=1.0, currency="USD", is_refund=False)],
schema=payment_schema,
).collect()
The TYPE_CHECK test compares the actual schema against what the code declares, using chispa:
def test_payment_schema_type_check(spark: SparkSession):
expected = StructType([
StructField("transaction_id", StringType(), nullable=False),
StructField("amount", DoubleType(), nullable=False),
StructField("currency", StringType(), nullable=False),
StructField("is_refund", BooleanType(), nullable=True),
])
assert_schema_equal(payment_schema, expected)
Quell also creates a conftest.py with a session-scoped SparkSession if one doesn't exist, so you don't have to set that up manually:
@pytest.fixture(scope="session")
def spark():
return (
SparkSession.builder
.master("local[1]")
.appName("quell-tests")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
)
The verification step
Every generated schema test goes through two rounds before being written:
First, it runs against your actual schema definition — must pass. If the schema is correctly defined with nullable=False, inserting a null should raise an exception.
Then Quell temporarily flips the field to nullable=True and runs the test again — must fail. This confirms the test is actually checking the constraint, not just coincidentally passing.
If a test passes both rounds, it's proven. If either round goes wrong, the test is discarded. Nothing gets written to disk that hasn't been shown to catch a real violation.
Schemas inside functions
Quell walks function bodies and return statements, not just module-level assignments:
def get_events_schema() -> StructType:
return StructType([
StructField("event_id", StringType(), nullable=False),
StructField("user_id", StringType(), nullable=False),
StructField("timestamp", DoubleType(), nullable=False),
])
Function-returned schemas are covered with the same generation and verification.
Adding it to CI
pip install quelltest[pyspark]
quell check src/ --no-llm --format json
The JSON output is machine-readable — you can pipe it into a threshold gate or a dashboard. If requirement coverage drops below a set threshold, fail the build. Schema regressions get caught in PR review instead of production.
PySpark schema docs — or jump straight to pip install quelltest[pyspark].