Filtering Events With an Aggregation Pipeline
Learners will pass a pipeline to watch() to receive only the subset of events their application cares about.
Why Filter Change Stream Events?
Without filtering, a change stream delivers every change event on a collection. In a busy production collection, this can mean thousands of events per second, most of which your application does not care about. Filtering at the server using an aggregation pipeline reduces network traffic, lowers CPU usage in your application, and ensures your event handler only processes relevant events. Filtering happens before events leave MongoDB—only matching events are transmitted to your client.
Passing a Pipeline to watch()
The first argument to watch() is an aggregation pipeline array. MongoDB applies this pipeline to each change event document before deciding whether to deliver it to your application. Not all aggregation stages are permitted in change stream pipelines—only a specific subset is allowed, primarily $match, $project, $addFields, $replaceRoot, and $redact. The $group and $lookup stages are not allowed.
// Only receive insert events — filter everything else
const changeStream = db.collection('orders').watch([
{
$match: {
operationType: 'insert'
}
}
]);
for await (const change of changeStream) {
// Only insert events arrive here
console.log('New order:', change.fullDocument._id);
}All lessons in this course
- Opening a Change Stream on a Collection
- Change Event Document Structure
- Filtering Events With an Aggregation Pipeline
- Resuming Change Streams After Interruption