Why SQLite for Durable Workflows?
If you have ever debugged a workflow that lost state after a crash, ran the same task twice, or silently dropped an event — you know why durable workflows matter. The idea is simple: every step, every event, and every piece of state lives in a database so that you can replay, recover, and reason about what happened.
While tools like Temporal and Durable Functions solve this problem at scale, most projects don't need a distributed system. SQLite — the most deployed database in the world — is more than enough for single-node durable workflows. And thanks to its ACID guarantees, you get exactly-once semantics almost for free.
This guide walks you through building a minimal but production-ready durable workflow engine using Python and SQLite, from scratch.
What You'll Build
By the end of this tutorial you will have:
- A workflow definition system that chains tasks together
- Persistent state storage with crash recovery
- Automatic retry with exponential back-off
- An event-sourcing log for debugging and auditing
- A runnable example: a three-step data pipeline (fetch → transform → load)
Step 1: Set Up the Database Schema
Our engine needs two tables: one for workflow instances and their state, and one for the event log.
import sqlite3
def init_db(db_path: str = "workflows.db") -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL") # better concurrency
conn.execute("PRAGMA foreign_keys=ON")
conn.executescript("""
CREATE TABLE IF NOT EXISTS workflows (
id TEXT PRIMARY KEY,
definition TEXT NOT NULL,
state TEXT NOT NULL DEFAULT 'pending',
payload TEXT,
result TEXT,
created_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
updated_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
);
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
workflow_id TEXT NOT NULL REFERENCES workflows(id),
event_type TEXT NOT NULL,
data TEXT,
occurred_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
);
CREATE TABLE IF NOT EXISTS steps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
workflow_id TEXT NOT NULL REFERENCES workflows(id),
step_name TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
retries INTEGER DEFAULT 0,
error TEXT,
result TEXT,
updated_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
);
""")
return conn
The journal_mode=WAL pragma enables Write-Ahead Logging, giving us better read concurrency without sacrificing safety. The three-table design separates instance tracking, event sourcing, and step-level progress.
Step 2: Define a Workflow Engine
Now let's build the engine that reads workflow definitions, executes steps, and persists everything along the way.
import uuid
import json
import time
from datetime import datetime, timezone
class WorkflowEngine:
def __init__(self, conn: sqlite3.Connection):
self.conn = conn
def start(self, definition: dict, payload: dict = None) -> str:
"""Create a new workflow instance and persist it."""
workflow_id = str(uuid.uuid4())
cur = self.conn.cursor()
cur.execute(
"INSERT INTO workflows (id, definition, state, payload) VALUES (?, ?, ?, ?)",
(workflow_id, json.dumps(definition), "pending", json.dumps(payload or {}))
)
cur.execute(
"INSERT INTO events (workflow_id, event_type, data) VALUES (?, ?, ?)",
(workflow_id, "workflow_started", json.dumps({"payload": payload}))
)
self.conn.commit()
return workflow_id
def execute(self, workflow_id: str) -> None:
"""Run all steps of a workflow, with crash recovery."""
row = self.conn.execute(
"SELECT definition, state FROM workflows WHERE id = ?",
(workflow_id,)
).fetchone()
if not row:
raise ValueError(f"Workflow {workflow_id} not found")
definition, state = json.loads(row[0]), row[1]
if state == "completed":
print(f"Workflow {workflow_id} already completed.")
return
steps = definition.get("steps", [])
for step in steps:
step_name = step["name"]
# Check if this step already succeeded (crash recovery)
existing = self.conn.execute(
"SELECT status FROM steps WHERE workflow_id = ? AND step_name = ?",
(workflow_id, step_name)
).fetchone()
if existing and existing[0] == "completed":
print(f"Skipping completed step: {step_name}")
continue
self._execute_step(workflow_id, step)
Step 3: Execute Individual Steps with Retry
Each step gets retried on failure with exponential back-off. This is the core of durability — transient failures don't kill the workflow.
def _execute_step(self, workflow_id: str, step: dict) -> None:
step_name = step["name"]
max_retries = step.get("retries", 3)
# Upsert the step record
self.conn.execute(
"""INSERT INTO steps (workflow_id, step_name, status, retries)
VALUES (?, ?, 'pending', 0)
ON CONFLICT DO NOTHING""",
(workflow_id, step_name)
)
self.conn.commit()
for attempt in range(max_retries + 1):
try:
# Look up the handler function
handler = step.get("handler")
if callable(handler):
result = handler(workflow_id, self.conn)
else:
result = {"note": f"Executed {step_name}"}
# Mark as completed
self.conn.execute(
"UPDATE steps SET status = 'completed', result = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now') WHERE workflow_id = ? AND step_name = ?",
(json.dumps(result), workflow_id, step_name)
)
self.conn.execute(
"INSERT INTO events (workflow_id, event_type, data) VALUES (?, ?, ?)",
(workflow_id, "step_completed", json.dumps({"step": step_name, "result": result}))
)
self.conn.commit()
print(f"Step '{step_name}' completed (attempt {attempt + 1})")
return
except Exception as e:
wait = 2 ** attempt # exponential back-off
self.conn.execute(
"""UPDATE steps SET status = 'failed', retries = ?,
error = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now')
WHERE workflow_id = ? AND step_name = ?""",
(attempt + 1, str(e), workflow_id, step_name)
)
self.conn.commit()
print(f"Step '{step_name}' failed (attempt {attempt + 1}): {e}")
if attempt < max_retries:
print(f" Retrying in {wait}s...")
time.sleep(wait)
# All retries exhausted
self._fail_workflow(workflow_id, f"Step '{step_name}' failed after {max_retries + 1} attempts")
def _fail_workflow(self, workflow_id: str, reason: str) -> None:
self.conn.execute(
"UPDATE workflows SET state = 'failed', updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now') WHERE id = ?",
(workflow_id,)
)
self.conn.execute(
"INSERT INTO events (workflow_id, event_type, data) VALUES (?, ?, ?)",
(workflow_id, "workflow_failed", json.dumps({"reason": reason}))
)
self.conn.commit()
Step 4: Define a Handler Registry
In practice, you want a clean separation between workflow definitions and the code that runs each step. A handler registry makes this easy:
# Define your handlers separately
def fetch_data(workflow_id: str, conn: sqlite3.Connection) -> dict:
"""Step 1: Fetch data from an API."""
# Replace with real API call
data = {"records": [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]}
print(f"Fetched {len(data['records'])} records")
return data
def transform_data(workflow_id: str, conn: sqlite3.Connection) -> dict:
"""Step 2: Transform the fetched data."""
payload = json.loads(conn.execute(
"SELECT result FROM steps WHERE workflow_id = ? AND step_name = 'fetch_data'"
).fetchone()[0])
transformed = [{"upper_name": r["name"].upper()} for r in payload["records"]]
print(f"Transformed {len(transformed)} records")
return {"records": transformed}
def load_data(workflow_id: str, conn: sqlite3.Connection) -> dict:
"""Step 3: Store the transformed data."""
payload = json.loads(conn.execute(
"SELECT result FROM steps WHERE workflow_id = ? AND step_name = 'transform_data'"
).fetchone()[0])
# Replace with real DB insert
print(f"Loaded {len(payload['records'])} records into destination")
return {"loaded": len(payload["records"])}
HANDLERS = {
"fetch_data": fetch_data,
"transform_data": transform_data,
"load_data": load_data,
}
Step 5: Put It All Together
Here's the complete runner — define the workflow, wire in handlers, and execute:
def main():
conn = init_db("workflows.db")
engine = WorkflowEngine(conn)
# Bind handlers to step definitions
workflow_def = {
"steps": [
{"name": "fetch_data", "handler": HANDLERS["fetch_data"], "retries": 3},
{"name": "transform_data", "handler": HANDLERS["transform_data"], "retries": 2},
{"name": "load_data", "handler": HANDLERS["load_data"], "retries": 3},
]
}
# Start and execute
wf_id = engine.start(workflow_def, payload={"source": "api.example.com/data"})
print(f"Started workflow: {wf_id}")
engine.execute(wf_id)
# Check final state
state = conn.execute("SELECT state FROM workflows WHERE id = ?", (wf_id,)).fetchone()[0]
if state != "completed":
conn.execute(
"UPDATE workflows SET state = 'completed', updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now') WHERE id = ?",
(wf_id,)
)
conn.commit()
# Audit: print event log
print("\\n--- Event Log ---")
for row in conn.execute(
"SELECT event_type, data, occurred_at FROM events WHERE workflow_id = ? ORDER BY id",
(wf_id,)
):
print(f" [{row[2]}] {row[0]}: {row[1]}")
conn.close()
if __name__ == "__main__":
main()
Step 6: Verify Crash Recovery
The real test of a durable workflow is recovery. Here's how you prove it works:
- Run the workflow once — it completes all three steps.
- Delete the database or simulate a crash mid-flight.
- Call
engine.execute()again with the same ID — completed steps are skipped, remaining steps run from where it left off.
The key is the check in execute(): if a step's status is already completed, it gets skipped. This is exactly how production orchestrators like Temporal work internally.
When to Use This vs. A Full Orchestrator
| Scenario | SQLite Workflow | Temporal / Durable Functions |
|---|---|---|
| Single-node service | ✅ Perfect fit | Overkill |
| < 100 workflows/day | ✅ Great | Unnecessary complexity |
| Multi-node distributed | ❌ Not suitable | ✅ Right tool |
| Cross-service coordination | ❌ Limited | ✅ Built for it |
| Zero infrastructure overhead | ✅ One file, zero deps | Requires cluster setup |
Next Steps
This foundation can be extended in several directions:
- Scheduled workflows: Add a
scheduled_atcolumn and a polling loop to run workflows at specific times. - Webhooks and signals: Let external systems send events into a running workflow by inserting into the
eventstable. - Parallel steps: Add a
depends_onfield and execute independent steps concurrently. - Metrics dashboard: Query the
stepsandeventstables to build a simple monitoring UI.
The full code for this tutorial is available as a single-file implementation — no dependencies beyond Python's standard library and SQLite. That's the beauty of durable workflows done right: sometimes the simplest tool is the best one.