0PricingLogin
AI Agents · Lesson

Event Queues and Message Brokers

Redis queues, RabbitMQ, and Kafka for decoupled agent communication.

Why Use Message Queues for Agents?

Message queues decouple the trigger (event producer) from the agent (event consumer). The producer fires events at any rate; the agent processes them at its own pace. This prevents overload and enables retries.

Redis Lists as Queues

Redis lists work as simple queues: LPUSH adds to the front (enqueue), BRPOP removes from the back and blocks if empty (dequeue). It is a reliable first-in-first-out queue.

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)
QUEUE_NAME = 'agent:tasks'

# Producer: add a task to the queue
def enqueue_task(task: dict):
    import json
    r.lpush(QUEUE_NAME, json.dumps(task))
    print(f'Enqueued task: {task["id"]}')

# Consumer: blocking pop (waits up to 5 seconds for a message)
def dequeue_task(timeout: int = 5):
    import json
    result = r.brpop(QUEUE_NAME, timeout=timeout)
    if result:
        queue_name, raw_data = result
        return json.loads(raw_data)
    return None  # Timed out - no tasks

# Producer side
enqueue_task({'id': 'task-1', 'type': 'email', 'email_id': 'email-abc'})
enqueue_task({'id': 'task-2', 'type': 'email', 'email_id': 'email-def'})

# Check queue length
print(f'Queue length: {r.llen(QUEUE_NAME)}')

All lessons in this course

  1. Async Python for Agent Developers
  2. Event Queues and Message Brokers
  3. Non-Blocking Parallel Tool Execution
  4. Async Agent Frameworks: LangChain and Beyond
← Back to AI Agents