Skip to content

Advanced Aggregation Pipelines

MongoDB’s aggregation framework is a powerful tool for data processing and analysis. This guide covers advanced techniques, optimization strategies, and complex use cases that go beyond basic queries.

Understanding the Aggregation Pipeline

The aggregation pipeline processes documents in stages, where each stage transforms the documents and passes the results to the next stage.

Pipeline Architecture

db.collection.aggregate([
{
$match: {
/* filter criteria */
},
}, // Stage 1: Filter
{
$group: {
/* grouping logic */
},
}, // Stage 2: Group
{
$sort: {
/* sorting criteria */
},
}, // Stage 3: Sort
{
$project: {
/* field selection */
},
}, // Stage 4: Project
{ $limit: 10 }, // Stage 5: Limit
]);

Performance Considerations

  1. Place $match early - Filter documents as early as possible to reduce pipeline workload
  2. Use $project to limit fields - Only include fields needed for subsequent stages
  3. Leverage indexes - Ensure $match and $sort stages can use indexes
  4. Consider $limit placement - Use early in pipeline when possible

Advanced Pipeline Stages

$lookup - Complex Joins

Perform left outer joins with related collections:

// Basic lookup
db.orders.aggregate([
{
$lookup: {
from: "customers",
localField: "customerId",
foreignField: "_id",
as: "customer",
},
},
]);
// Advanced lookup with pipeline
db.orders.aggregate([
{
$lookup: {
from: "products",
let: { orderItems: "$items" },
pipeline: [
{
$match: {
$expr: {
$in: ["$_id", "$$orderItems.productId"],
},
},
},
{
$project: {
name: 1,
price: 1,
category: 1,
},
},
],
as: "productDetails",
},
},
]);

$facet - Multi-dimensional Analysis

Execute multiple aggregation pipelines in parallel:

db.sales.aggregate([
{
$facet: {
// Pipeline 1: Sales by category
salesByCategory: [
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
{ $sort: { total: -1 } },
],
// Pipeline 2: Monthly trends
monthlyTrends: [
{
$group: {
_id: { $month: "$date" },
totalSales: { $sum: "$amount" },
avgOrder: { $avg: "$amount" },
},
},
],
// Pipeline 3: Top customers
topCustomers: [
{ $group: { _id: "$customerId", total: { $sum: "$amount" } } },
{ $sort: { total: -1 } },
{ $limit: 10 },
],
},
},
]);

$graphLookup - Recursive Queries

Perform recursive searches in hierarchical data:

// Find all reports in organizational hierarchy
db.employees.aggregate([
{
$match: { name: "Alice Johnson" },
},
{
$graphLookup: {
from: "employees",
startWith: "$_id",
connectFromField: "_id",
connectToField: "managerId",
as: "allReports",
maxDepth: 5,
depthField: "level",
},
},
]);
// Find shortest path between locations
db.flights.aggregate([
{
$graphLookup: {
from: "flights",
startWith: "JFK",
connectFromField: "to",
connectToField: "from",
as: "connections",
maxDepth: 3,
restrictSearchWithMatch: { date: { $gte: new Date() } },
},
},
]);

$bucket and $bucketAuto - Data Segmentation

Group documents into buckets for analysis:

// Custom bucket boundaries
db.products.aggregate([
{
$bucket: {
groupBy: "$price",
boundaries: [0, 25, 50, 100, 200],
default: "expensive",
output: {
count: { $sum: 1 },
products: { $push: "$name" },
avgPrice: { $avg: "$price" },
},
},
},
]);
// Automatic bucket creation
db.orders.aggregate([
{
$bucketAuto: {
groupBy: "$totalAmount",
buckets: 5,
output: {
count: { $sum: 1 },
avgAmount: { $avg: "$totalAmount" },
orders: { $push: "$$ROOT" },
},
},
},
]);

Complex Aggregation Patterns

Time-Based Analytics

// Daily sales with moving average
db.sales.aggregate([
{
$match: {
date: {
$gte: ISODate("2024-01-01"),
$lt: ISODate("2024-12-31"),
},
},
},
{
$group: {
_id: {
$dateToString: {
format: "%Y-%m-%d",
date: "$date",
},
},
dailySales: { $sum: "$amount" },
orderCount: { $sum: 1 },
},
},
{
$sort: { _id: 1 },
},
{
$setWindowFields: {
sortBy: { _id: 1 },
output: {
movingAverage: {
$avg: "$dailySales",
window: {
range: [-6, 0],
unit: "position",
},
},
},
},
},
]);

Text Analysis with $regex and $text

// Advanced text search and analysis
db.articles.aggregate([
{
$match: {
$text: {
$search: "mongodb aggregation pipeline",
$caseSensitive: false,
},
},
},
{
$addFields: {
score: { $meta: "textScore" },
wordCount: {
$size: {
$split: [{ $trim: { input: "$content" } }, " "],
},
},
tags: {
$regexFindAll: {
input: "$content",
regex: /#(\w+)/g,
},
},
},
},
{
$project: {
title: 1,
score: 1,
wordCount: 1,
hashtags: {
$map: {
input: "$tags",
as: "tag",
in: "$$tag.captures",
},
},
},
},
]);

Conditional Logic with $cond and $switch

// Complex conditional processing
db.students.aggregate([
{
$addFields: {
grade: {
$switch: {
branches: [
{ case: { $gte: ["$score", 90] }, then: "A" },
{ case: { $gte: ["$score", 80] }, then: "B" },
{ case: { $gte: ["$score", 70] }, then: "C" },
{ case: { $gte: ["$score", 60] }, then: "D" },
],
default: "F",
},
},
status: {
$cond: {
if: { $gte: ["$score", 60] },
then: "Pass",
else: "Fail",
},
},
performance: {
$cond: {
if: {
$and: [{ $gte: ["$score", 85] }, { $gte: ["$attendance", 90] }],
},
then: "Excellent",
else: {
$cond: {
if: { $gte: ["$score", 70] },
then: "Good",
else: "Needs Improvement",
},
},
},
},
},
},
]);

Performance Optimization Techniques

Index Usage in Aggregation

  1. Early $match stages can use indexes effectively
  2. $sort stages benefit from compound indexes
  3. $lookup operations perform better with indexes on join fields
  4. Use explain() to verify index usage
// Check pipeline performance
db.orders
.aggregate([
{ $match: { status: "shipped", date: { $gte: ISODate("2024-01-01") } } },
{ $group: { _id: "$customerId", total: { $sum: "$amount" } } },
{ $sort: { total: -1 } },
])
.explain("executionStats");
// Optimize with compound index
db.orders.createIndex({ status: 1, date: 1 }); // Supports $match
db.orders.createIndex({ customerId: 1 }); // Supports $group

Memory Management

// Use allowDiskUse for large datasets
db.largeCollection.aggregate(
[
// ... complex pipeline stages
],
{
allowDiskUse: true,
maxTimeMS: 300000, // 5 minute timeout
}
);
// Limit working set size
db.collection.aggregate([
{
$match: {
/* early filtering */
},
},
{
$project: {
/* only needed fields */
},
},
{ $limit: 1000 }, // Limit early if possible
// ... other stages
]);

Pipeline Optimization Strategies

// Bad: Expensive operations early
db.orders.aggregate([
{
$lookup: {
/* expensive join */
},
},
{ $match: { status: "active" } }, // Filter after expensive join
{
$group: {
/* grouping logic */
},
},
]);
// Good: Filter early, minimize data
db.orders.aggregate([
{ $match: { status: "active" } }, // Filter first
{
$project: {
customerId: 1,
amount: 1,
date: 1, // Only needed fields
},
},
{
$lookup: {
/* join with less data */
},
},
{
$group: {
/* grouping logic */
},
},
]);

Real-World Use Cases

E-commerce Analytics Dashboard

db.orders.aggregate([
{
$match: {
date: {
$gte: ISODate("2024-01-01"),
$lt: ISODate("2024-12-31"),
},
},
},
{
$facet: {
// Revenue metrics
revenueMetrics: [
{
$group: {
_id: null,
totalRevenue: { $sum: "$totalAmount" },
avgOrderValue: { $avg: "$totalAmount" },
orderCount: { $sum: 1 },
},
},
],
// Monthly trends
monthlyTrends: [
{
$group: {
_id: {
year: { $year: "$date" },
month: { $month: "$date" },
},
revenue: { $sum: "$totalAmount" },
orders: { $sum: 1 },
},
},
{ $sort: { "_id.year": 1, "_id.month": 1 } },
],
// Top products
topProducts: [
{ $unwind: "$items" },
{
$group: {
_id: "$items.productId",
totalSold: { $sum: "$items.quantity" },
revenue: {
$sum: { $multiply: ["$items.quantity", "$items.price"] },
},
},
},
{ $sort: { revenue: -1 } },
{ $limit: 10 },
{
$lookup: {
from: "products",
localField: "_id",
foreignField: "_id",
as: "product",
},
},
],
// Customer segments
customerSegments: [
{
$group: {
_id: "$customerId",
totalSpent: { $sum: "$totalAmount" },
orderCount: { $sum: 1 },
},
},
{
$bucket: {
groupBy: "$totalSpent",
boundaries: [0, 100, 500, 1000, 5000],
default: "premium",
output: {
customers: { $sum: 1 },
avgSpent: { $avg: "$totalSpent" },
},
},
},
],
},
},
]);

Social Media Analytics

// Analyze user engagement patterns
db.posts.aggregate([
{
$match: {
createdAt: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) },
},
},
{
$lookup: {
from: "interactions",
localField: "_id",
foreignField: "postId",
as: "interactions",
},
},
{
$addFields: {
engagementScore: {
$add: [
{ $multiply: [{ $size: "$interactions.likes" }, 1] },
{ $multiply: [{ $size: "$interactions.shares" }, 3] },
{ $multiply: [{ $size: "$interactions.comments" }, 2] },
],
},
hourPosted: { $hour: "$createdAt" },
dayOfWeek: { $dayOfWeek: "$createdAt" },
},
},
{
$facet: {
bestPostingTimes: [
{
$group: {
_id: {
hour: "$hourPosted",
day: "$dayOfWeek",
},
avgEngagement: { $avg: "$engagementScore" },
postCount: { $sum: 1 },
},
},
{ $sort: { avgEngagement: -1 } },
],
topPerformingPosts: [
{ $sort: { engagementScore: -1 } },
{ $limit: 20 },
{
$project: {
content: { $substr: ["$text", 0, 100] },
engagementScore: 1,
type: 1,
hashtags: 1,
},
},
],
hashtagAnalysis: [
{ $unwind: "$hashtags" },
{
$group: {
_id: "$hashtags",
usage: { $sum: 1 },
avgEngagement: { $avg: "$engagementScore" },
},
},
{ $sort: { avgEngagement: -1 } },
],
},
},
]);

Troubleshooting Common Issues

Memory Limitations

// Error: Aggregation pipeline exceeds memory limit
// Solution: Use allowDiskUse and optimize pipeline
// Before: Memory intensive
db.collection.aggregate([
{ $group: { _id: "$field", data: { $push: "$$ROOT" } } },
]);
// After: Memory efficient
db.collection.aggregate([{ $group: { _id: "$field", count: { $sum: 1 } } }], {
allowDiskUse: true,
});

Performance Bottlenecks

// Identify slow stages with explain
db.collection
.aggregate([
// ... pipeline stages
])
.explain("executionStats");
// Look for:
// - High "executionTimeMillisEstimate"
// - "totalDocsExamined" vs "totalDocsReturned" ratio
// - Missing index usage in $match stages

Data Type Issues

// Handle mixed data types safely
db.collection.aggregate([
{
$addFields: {
numericField: {
$cond: {
if: { $type: "$maybeNumber" },
then: {
$cond: {
if: { $eq: [{ $type: "$maybeNumber" }, "string"] },
then: { $toDouble: "$maybeNumber" },
else: "$maybeNumber",
},
},
else: 0,
},
},
},
},
]);

Best Practices Summary

  1. Design for performance - Place filtering stages early, limit data flow
  2. Use indexes effectively - Ensure $match and $sort can leverage indexes
  3. Monitor memory usage - Use allowDiskUse for large datasets
  4. Optimize $lookup operations - Index join fields and use pipelines when needed
  5. Test with realistic data - Validate performance with production-sized datasets
  6. Use explain() regularly - Monitor and optimize pipeline performance
  7. Consider alternatives - Sometimes multiple simple queries outperform complex pipelines

Conclusion

Advanced aggregation pipelines are powerful tools for complex data analysis in MongoDB. By understanding the various stages, optimization techniques, and real-world patterns, you can build efficient and scalable data processing solutions.

Remember to always consider performance implications, use appropriate indexes, and test your pipelines with realistic datasets. The aggregation framework’s flexibility allows for sophisticated data transformations, but with great power comes the responsibility to use it wisely.