Aggregation Pipelines and Complex Queries
Run filtering, grouping, and aggregation pipelines to power analytics-style endpoints efficiently.
Why Aggregation Pipelines?
Simple find queries return documents as-is. But analytics endpoints often need grouped, computed, and reshaped data: total revenue per month, average rating per product, top 10 active users.
MongoDB's aggregation pipeline runs this work inside the database, so you ship only the final result over the wire instead of pulling thousands of documents into Python and looping.
- Pipeline = an ordered list of stages
- Each stage takes a stream of documents in and emits documents out
- Beanie exposes it through
Document.aggregate(pipeline)
Our Beanie Models
Throughout this lesson we use an Order document. Each order has a customer, a status, a total amount, and a created timestamp. We'll build analytics endpoints on top of it.
Beanie documents subclass beanie.Document, which is itself a Pydantic model bound to a MongoDB collection.
from datetime import datetime
from beanie import Document
from pydantic import Field
class Order(Document):
customer_id: str
status: str # "paid", "pending", "cancelled"
total: float
created_at: datetime = Field(default_factory=datetime.utcnow)
class Settings:
name = "orders"All lessons in this course
- Async MongoDB Access with Motor
- Document Modeling with Beanie ODM
- Aggregation Pipelines and Complex Queries
- Schema Evolution and Document Migrations