Why SQLite for Durable Workflows?

If you have ever debugged a workflow that lost state after a crash, ran the same task twice, or silently dropped an event — you know why durable workflows matter. The idea is simple: every step, every event, and every piece of state lives in a database so that you can replay, recover, and reason about what happened.

While tools like Temporal and Durable Functions solve this problem at scale, most projects don't need a distributed system. SQLite — the most deployed database in the world — is more than enough for single-node durable workflows. And thanks to its ACID guarantees, you get exactly-once semantics almost for free.

This guide walks you through building a minimal but production-ready durable workflow engine using Python and SQLite, from scratch.

What You'll Build

By the end of this tutorial you will have:

  • A workflow definition system that chains tasks together
  • Persistent state storage with crash recovery
  • Automatic retry with exponential back-off
  • An event-sourcing log for debugging and auditing
  • A runnable example: a three-step data pipeline (fetch → transform → load)

Step 1: Set Up the Database Schema

Our engine needs two tables: one for workflow instances and their state, and one for the event log.

import sqlite3

def init_db(db_path: str = "workflows.db") -> sqlite3.Connection:
    conn = sqlite3.connect(db_path)
    conn.execute("PRAGMA journal_mode=WAL")  # better concurrency
    conn.execute("PRAGMA foreign_keys=ON")

    conn.executescript("""
        CREATE TABLE IF NOT EXISTS workflows (
            id          TEXT PRIMARY KEY,
            definition  TEXT NOT NULL,
            state       TEXT NOT NULL DEFAULT 'pending',
            payload     TEXT,
            result      TEXT,
            created_at  TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
            updated_at  TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
        );

        CREATE TABLE IF NOT EXISTS events (
            id          INTEGER PRIMARY KEY AUTOINCREMENT,
            workflow_id TEXT NOT NULL REFERENCES workflows(id),
            event_type  TEXT NOT NULL,
            data        TEXT,
            occurred_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
        );

        CREATE TABLE IF NOT EXISTS steps (
            id          INTEGER PRIMARY KEY AUTOINCREMENT,
            workflow_id TEXT NOT NULL REFERENCES workflows(id),
            step_name   TEXT NOT NULL,
            status      TEXT NOT NULL DEFAULT 'pending',
            retries     INTEGER DEFAULT 0,
            error       TEXT,
            result      TEXT,
            updated_at  TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
        );
    """)

    return conn

The journal_mode=WAL pragma enables Write-Ahead Logging, giving us better read concurrency without sacrificing safety. The three-table design separates instance tracking, event sourcing, and step-level progress.

Step 2: Define a Workflow Engine

Now let's build the engine that reads workflow definitions, executes steps, and persists everything along the way.

import uuid
import json
import time
from datetime import datetime, timezone

class WorkflowEngine:
    def __init__(self, conn: sqlite3.Connection):
        self.conn = conn

    def start(self, definition: dict, payload: dict = None) -> str:
        """Create a new workflow instance and persist it."""
        workflow_id = str(uuid.uuid4())
        cur = self.conn.cursor()
        cur.execute(
            "INSERT INTO workflows (id, definition, state, payload) VALUES (?, ?, ?, ?)",
            (workflow_id, json.dumps(definition), "pending", json.dumps(payload or {}))
        )
        cur.execute(
            "INSERT INTO events (workflow_id, event_type, data) VALUES (?, ?, ?)",
            (workflow_id, "workflow_started", json.dumps({"payload": payload}))
        )
        self.conn.commit()
        return workflow_id

    def execute(self, workflow_id: str) -> None:
        """Run all steps of a workflow, with crash recovery."""
        row = self.conn.execute(
            "SELECT definition, state FROM workflows WHERE id = ?",
            (workflow_id,)
        ).fetchone()

        if not row:
            raise ValueError(f"Workflow {workflow_id} not found")

        definition, state = json.loads(row[0]), row[1]
        if state == "completed":
            print(f"Workflow {workflow_id} already completed.")
            return

        steps = definition.get("steps", [])

        for step in steps:
            step_name = step["name"]
            # Check if this step already succeeded (crash recovery)
            existing = self.conn.execute(
                "SELECT status FROM steps WHERE workflow_id = ? AND step_name = ?",
                (workflow_id, step_name)
            ).fetchone()

            if existing and existing[0] == "completed":
                print(f"Skipping completed step: {step_name}")
                continue

            self._execute_step(workflow_id, step)

Step 3: Execute Individual Steps with Retry

Each step gets retried on failure with exponential back-off. This is the core of durability — transient failures don't kill the workflow.

    def _execute_step(self, workflow_id: str, step: dict) -> None:
        step_name = step["name"]
        max_retries = step.get("retries", 3)

        # Upsert the step record
        self.conn.execute(
            """INSERT INTO steps (workflow_id, step_name, status, retries)
               VALUES (?, ?, 'pending', 0)
               ON CONFLICT DO NOTHING""",
            (workflow_id, step_name)
        )
        self.conn.commit()

        for attempt in range(max_retries + 1):
            try:
                # Look up the handler function
                handler = step.get("handler")
                if callable(handler):
                    result = handler(workflow_id, self.conn)
                else:
                    result = {"note": f"Executed {step_name}"}

                # Mark as completed
                self.conn.execute(
                    "UPDATE steps SET status = 'completed', result = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now') WHERE workflow_id = ? AND step_name = ?",
                    (json.dumps(result), workflow_id, step_name)
                )
                self.conn.execute(
                    "INSERT INTO events (workflow_id, event_type, data) VALUES (?, ?, ?)",
                    (workflow_id, "step_completed", json.dumps({"step": step_name, "result": result}))
                )
                self.conn.commit()
                print(f"Step '{step_name}' completed (attempt {attempt + 1})")
                return

            except Exception as e:
                wait = 2 ** attempt  # exponential back-off
                self.conn.execute(
                    """UPDATE steps SET status = 'failed', retries = ?,
                       error = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now')
                       WHERE workflow_id = ? AND step_name = ?""",
                    (attempt + 1, str(e), workflow_id, step_name)
                )
                self.conn.commit()
                print(f"Step '{step_name}' failed (attempt {attempt + 1}): {e}")

                if attempt < max_retries:
                    print(f"  Retrying in {wait}s...")
                    time.sleep(wait)

        # All retries exhausted
        self._fail_workflow(workflow_id, f"Step '{step_name}' failed after {max_retries + 1} attempts")

    def _fail_workflow(self, workflow_id: str, reason: str) -> None:
        self.conn.execute(
            "UPDATE workflows SET state = 'failed', updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now') WHERE id = ?",
            (workflow_id,)
        )
        self.conn.execute(
            "INSERT INTO events (workflow_id, event_type, data) VALUES (?, ?, ?)",
            (workflow_id, "workflow_failed", json.dumps({"reason": reason}))
        )
        self.conn.commit()

Step 4: Define a Handler Registry

In practice, you want a clean separation between workflow definitions and the code that runs each step. A handler registry makes this easy:

# Define your handlers separately
def fetch_data(workflow_id: str, conn: sqlite3.Connection) -> dict:
    """Step 1: Fetch data from an API."""
    # Replace with real API call
    data = {"records": [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]}
    print(f"Fetched {len(data['records'])} records")
    return data

def transform_data(workflow_id: str, conn: sqlite3.Connection) -> dict:
    """Step 2: Transform the fetched data."""
    payload = json.loads(conn.execute(
        "SELECT result FROM steps WHERE workflow_id = ? AND step_name = 'fetch_data'"
    ).fetchone()[0])
    transformed = [{"upper_name": r["name"].upper()} for r in payload["records"]]
    print(f"Transformed {len(transformed)} records")
    return {"records": transformed}

def load_data(workflow_id: str, conn: sqlite3.Connection) -> dict:
    """Step 3: Store the transformed data."""
    payload = json.loads(conn.execute(
        "SELECT result FROM steps WHERE workflow_id = ? AND step_name = 'transform_data'"
    ).fetchone()[0])
    # Replace with real DB insert
    print(f"Loaded {len(payload['records'])} records into destination")
    return {"loaded": len(payload["records"])}

HANDLERS = {
    "fetch_data": fetch_data,
    "transform_data": transform_data,
    "load_data": load_data,
}

Step 5: Put It All Together

Here's the complete runner — define the workflow, wire in handlers, and execute:

def main():
    conn = init_db("workflows.db")
    engine = WorkflowEngine(conn)

    # Bind handlers to step definitions
    workflow_def = {
        "steps": [
            {"name": "fetch_data",      "handler": HANDLERS["fetch_data"],      "retries": 3},
            {"name": "transform_data",  "handler": HANDLERS["transform_data"],  "retries": 2},
            {"name": "load_data",       "handler": HANDLERS["load_data"],       "retries": 3},
        ]
    }

    # Start and execute
    wf_id = engine.start(workflow_def, payload={"source": "api.example.com/data"})
    print(f"Started workflow: {wf_id}")
    engine.execute(wf_id)

    # Check final state
    state = conn.execute("SELECT state FROM workflows WHERE id = ?", (wf_id,)).fetchone()[0]
    if state != "completed":
        conn.execute(
            "UPDATE workflows SET state = 'completed', updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now') WHERE id = ?",
            (wf_id,)
        )
        conn.commit()

    # Audit: print event log
    print("\\n--- Event Log ---")
    for row in conn.execute(
        "SELECT event_type, data, occurred_at FROM events WHERE workflow_id = ? ORDER BY id",
        (wf_id,)
    ):
        print(f"  [{row[2]}] {row[0]}: {row[1]}")

    conn.close()

if __name__ == "__main__":
    main()

Step 6: Verify Crash Recovery

The real test of a durable workflow is recovery. Here's how you prove it works:

  1. Run the workflow once — it completes all three steps.
  2. Delete the database or simulate a crash mid-flight.
  3. Call engine.execute() again with the same ID — completed steps are skipped, remaining steps run from where it left off.

The key is the check in execute(): if a step's status is already completed, it gets skipped. This is exactly how production orchestrators like Temporal work internally.

When to Use This vs. A Full Orchestrator

Scenario SQLite Workflow Temporal / Durable Functions
Single-node service ✅ Perfect fit Overkill
< 100 workflows/day ✅ Great Unnecessary complexity
Multi-node distributed ❌ Not suitable ✅ Right tool
Cross-service coordination ❌ Limited ✅ Built for it
Zero infrastructure overhead ✅ One file, zero deps Requires cluster setup

Next Steps

This foundation can be extended in several directions:

  • Scheduled workflows: Add a scheduled_at column and a polling loop to run workflows at specific times.
  • Webhooks and signals: Let external systems send events into a running workflow by inserting into the events table.
  • Parallel steps: Add a depends_on field and execute independent steps concurrently.
  • Metrics dashboard: Query the steps and events tables to build a simple monitoring UI.

The full code for this tutorial is available as a single-file implementation — no dependencies beyond Python's standard library and SQLite. That's the beauty of durable workflows done right: sometimes the simplest tool is the best one.