Testing Pipeline Steps with Assertions
Add row-count checks, null assertions, and expected-column guards at each stage so errors surface immediately.
Why Test Pipeline Steps?
A data pipeline that runs without errors can still produce silently wrong output: rows dropped by the wrong filter, a column multiplied by the wrong factor, or a merge that duplicates rows because the join key was not unique. The only way to catch these silent failures is to embed assertions that verify expected properties of the data at each pipeline stage. Assertions turn logical bugs into loud, immediately visible errors.
import pandas as pd
import numpy as np
# A transform that looks correct but has a bug:
def compute_revenue_buggy(df):
# BUG: unit_price should be multiplied, not added
df['revenue'] = df['quantity'] + df['unit_price']
return df
# Without assertions, this runs silently with wrong numbers.Row Count Checks
After every filtering step, assert that the resulting row count is within an expected range. Too few rows means the filter was too aggressive; too many rows means a join duplicated records. Express the expected range as a fraction of the input: for example, a null-drop step should never remove more than 30 % of rows in healthy data. This guard catches unexpected data quality changes in upstream sources.
def drop_nulls_guarded(df, required_cols, max_drop_fraction=0.3):
before = len(df)
result = df.dropna(subset=required_cols)
after = len(result)
drop_fraction = (before - after) / before
assert drop_fraction <= max_drop_fraction, \
f'dropna removed {drop_fraction:.1%} of rows (limit {max_drop_fraction:.1%})'
return resultAll lessons in this course
- Structuring Transformation Steps as Functions
- Parameterising Pipelines with Config Dicts
- Testing Pipeline Steps with Assertions
- Scheduling and Logging Pipeline Runs