All posts
·4 min read·0 views·Shashank Bindal

PySpark Schema Testing: Never Ship a Broken Schema Again

Learn how to automatically generate and verify PySpark schema tests from StructType definitions — no pyspark import required at scan time, no boilerplate, no manual test writing.

PySpark Schema Testing: Never Ship a Broken Schema Again

PySpark schema bugs are the ones that get you at 3am. A nullable=False column that starts accepting nulls after a migration. A StringType that was supposed to be DoubleType. A schema in the code that quietly drifted from what's actually landing in the data lake.

These don't cause loud crashes. They cause silent data corruption that you find out about much later, usually when someone's looking at a dashboard and something doesn't add up.

The fix is schema tests. The problem is that almost nobody writes them — because writing them by hand is genuinely annoying. PySpark tests need a SparkSession. A SparkSession in CI needs either a real local Spark install or a mocked fixture that doesn't really match cluster behavior. It's 50+ lines of boilerplate per schema, and the boilerplate rarely gets written.

What schema tests actually need to check

Take this schema:

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, BooleanType

payment_schema = StructType([
    StructField("transaction_id", StringType(), nullable=False),
    StructField("amount",         DoubleType(),  nullable=False),
    StructField("currency",       StringType(),  nullable=False),
    StructField("is_refund",      BooleanType(), nullable=True),
])

Two categories of things to verify:

Nullability constraintstransaction_id, amount, and currency all say nullable=False. That's a promise that inserting a null should fail. If someone changes those to nullable=True during a migration, you want a test to catch it.

Type correctnessamount is DoubleType. If a pipeline change accidentally casts it to StringType, you want that caught before it reaches your data lake, not after someone notices the aggregations are broken.

Generating them automatically

Quell has a PySpark reader that parses StructType definitions directly from the AST — without importing pyspark at scan time. This matters because most CI environments don't have Spark installed just to run a scan.

pip install quelltest
quell check src/schemas.py --no-llm
  payment_schema   NOT_NULL    transaction_id must not be null   ✗ no test
  payment_schema   NOT_NULL    amount must not be null            ✗ no test
  payment_schema   NOT_NULL    currency must not be null          ✗ no test
  payment_schema   TYPE_CHECK  payment_schema matches StructType  ✗ no test

With --fix:

quell check src/schemas.py --fix --no-llm
  payment_schema   NOT_NULL    transaction_id   ✓ verified → written
  payment_schema   NOT_NULL    amount           ✓ verified → written
  payment_schema   NOT_NULL    currency         ✓ verified → written
  payment_schema   TYPE_CHECK  payment_schema   ✓ verified → written

  4 tests written → tests/test_schemas.py
  Your code never left your machine.

What gets written

The NOT_NULL tests try to insert a null value for each constrained column:

def test_payment_schema_not_null_transaction_id(spark: SparkSession):
    with pytest.raises((AnalysisException, Exception)):
        spark.createDataFrame(
            [Row(transaction_id=None, amount=1.0, currency="USD", is_refund=False)],
            schema=payment_schema,
        ).collect()

The TYPE_CHECK test compares the actual schema against what the code declares, using chispa:

def test_payment_schema_type_check(spark: SparkSession):
    expected = StructType([
        StructField("transaction_id", StringType(), nullable=False),
        StructField("amount",         DoubleType(),  nullable=False),
        StructField("currency",       StringType(),  nullable=False),
        StructField("is_refund",      BooleanType(), nullable=True),
    ])
    assert_schema_equal(payment_schema, expected)

Quell also creates a conftest.py with a session-scoped SparkSession if one doesn't exist, so you don't have to set that up manually:

@pytest.fixture(scope="session")
def spark():
    return (
        SparkSession.builder
        .master("local[1]")
        .appName("quell-tests")
        .config("spark.sql.shuffle.partitions", "1")
        .getOrCreate()
    )

The verification step

Every generated schema test goes through two rounds before being written:

First, it runs against your actual schema definition — must pass. If the schema is correctly defined with nullable=False, inserting a null should raise an exception.

Then Quell temporarily flips the field to nullable=True and runs the test again — must fail. This confirms the test is actually checking the constraint, not just coincidentally passing.

If a test passes both rounds, it's proven. If either round goes wrong, the test is discarded. Nothing gets written to disk that hasn't been shown to catch a real violation.

Schemas inside functions

Quell walks function bodies and return statements, not just module-level assignments:

def get_events_schema() -> StructType:
    return StructType([
        StructField("event_id",   StringType(), nullable=False),
        StructField("user_id",    StringType(), nullable=False),
        StructField("timestamp",  DoubleType(),  nullable=False),
    ])

Function-returned schemas are covered with the same generation and verification.

Adding it to CI

pip install quelltest[pyspark]
quell check src/ --no-llm --format json

The JSON output is machine-readable — you can pipe it into a threshold gate or a dashboard. If requirement coverage drops below a set threshold, fail the build. Schema regressions get caught in PR review instead of production.


PySpark schema docs — or jump straight to pip install quelltest[pyspark].

Try Quelltest

Install Quelltest and run it on your codebase — no API key, no configuration.