Advanced Aggregation Pipelines
MongoDB’s aggregation framework is a powerful tool for data processing and analysis. This guide covers advanced techniques, optimization strategies, and complex use cases that go beyond basic queries.
Understanding the Aggregation Pipeline
The aggregation pipeline processes documents in stages, where each stage transforms the documents and passes the results to the next stage.
Pipeline Architecture
db.collection.aggregate([ { $match: { /* filter criteria */ }, }, // Stage 1: Filter { $group: { /* grouping logic */ }, }, // Stage 2: Group { $sort: { /* sorting criteria */ }, }, // Stage 3: Sort { $project: { /* field selection */ }, }, // Stage 4: Project { $limit: 10 }, // Stage 5: Limit]);
Performance Considerations
- Place $match early - Filter documents as early as possible to reduce pipeline workload
- Use $project to limit fields - Only include fields needed for subsequent stages
- Leverage indexes - Ensure $match and $sort stages can use indexes
- Consider $limit placement - Use early in pipeline when possible
Advanced Pipeline Stages
$lookup - Complex Joins
Perform left outer joins with related collections:
// Basic lookupdb.orders.aggregate([ { $lookup: { from: "customers", localField: "customerId", foreignField: "_id", as: "customer", }, },]);
// Advanced lookup with pipelinedb.orders.aggregate([ { $lookup: { from: "products", let: { orderItems: "$items" }, pipeline: [ { $match: { $expr: { $in: ["$_id", "$$orderItems.productId"], }, }, }, { $project: { name: 1, price: 1, category: 1, }, }, ], as: "productDetails", }, },]);
$facet - Multi-dimensional Analysis
Execute multiple aggregation pipelines in parallel:
db.sales.aggregate([ { $facet: { // Pipeline 1: Sales by category salesByCategory: [ { $group: { _id: "$category", total: { $sum: "$amount" } } }, { $sort: { total: -1 } }, ], // Pipeline 2: Monthly trends monthlyTrends: [ { $group: { _id: { $month: "$date" }, totalSales: { $sum: "$amount" }, avgOrder: { $avg: "$amount" }, }, }, ], // Pipeline 3: Top customers topCustomers: [ { $group: { _id: "$customerId", total: { $sum: "$amount" } } }, { $sort: { total: -1 } }, { $limit: 10 }, ], }, },]);
$graphLookup - Recursive Queries
Perform recursive searches in hierarchical data:
// Find all reports in organizational hierarchydb.employees.aggregate([ { $match: { name: "Alice Johnson" }, }, { $graphLookup: { from: "employees", startWith: "$_id", connectFromField: "_id", connectToField: "managerId", as: "allReports", maxDepth: 5, depthField: "level", }, },]);
// Find shortest path between locationsdb.flights.aggregate([ { $graphLookup: { from: "flights", startWith: "JFK", connectFromField: "to", connectToField: "from", as: "connections", maxDepth: 3, restrictSearchWithMatch: { date: { $gte: new Date() } }, }, },]);
$bucket and $bucketAuto - Data Segmentation
Group documents into buckets for analysis:
// Custom bucket boundariesdb.products.aggregate([ { $bucket: { groupBy: "$price", boundaries: [0, 25, 50, 100, 200], default: "expensive", output: { count: { $sum: 1 }, products: { $push: "$name" }, avgPrice: { $avg: "$price" }, }, }, },]);
// Automatic bucket creationdb.orders.aggregate([ { $bucketAuto: { groupBy: "$totalAmount", buckets: 5, output: { count: { $sum: 1 }, avgAmount: { $avg: "$totalAmount" }, orders: { $push: "$$ROOT" }, }, }, },]);
Complex Aggregation Patterns
Time-Based Analytics
// Daily sales with moving averagedb.sales.aggregate([ { $match: { date: { $gte: ISODate("2024-01-01"), $lt: ISODate("2024-12-31"), }, }, }, { $group: { _id: { $dateToString: { format: "%Y-%m-%d", date: "$date", }, }, dailySales: { $sum: "$amount" }, orderCount: { $sum: 1 }, }, }, { $sort: { _id: 1 }, }, { $setWindowFields: { sortBy: { _id: 1 }, output: { movingAverage: { $avg: "$dailySales", window: { range: [-6, 0], unit: "position", }, }, }, }, },]);
Text Analysis with $regex and $text
// Advanced text search and analysisdb.articles.aggregate([ { $match: { $text: { $search: "mongodb aggregation pipeline", $caseSensitive: false, }, }, }, { $addFields: { score: { $meta: "textScore" }, wordCount: { $size: { $split: [{ $trim: { input: "$content" } }, " "], }, }, tags: { $regexFindAll: { input: "$content", regex: /#(\w+)/g, }, }, }, }, { $project: { title: 1, score: 1, wordCount: 1, hashtags: { $map: { input: "$tags", as: "tag", in: "$$tag.captures", }, }, }, },]);
Conditional Logic with $cond and $switch
// Complex conditional processingdb.students.aggregate([ { $addFields: { grade: { $switch: { branches: [ { case: { $gte: ["$score", 90] }, then: "A" }, { case: { $gte: ["$score", 80] }, then: "B" }, { case: { $gte: ["$score", 70] }, then: "C" }, { case: { $gte: ["$score", 60] }, then: "D" }, ], default: "F", }, }, status: { $cond: { if: { $gte: ["$score", 60] }, then: "Pass", else: "Fail", }, }, performance: { $cond: { if: { $and: [{ $gte: ["$score", 85] }, { $gte: ["$attendance", 90] }], }, then: "Excellent", else: { $cond: { if: { $gte: ["$score", 70] }, then: "Good", else: "Needs Improvement", }, }, }, }, }, },]);
Performance Optimization Techniques
Index Usage in Aggregation
- Early $match stages can use indexes effectively
- $sort stages benefit from compound indexes
- $lookup operations perform better with indexes on join fields
- Use
explain()
to verify index usage
// Check pipeline performancedb.orders .aggregate([ { $match: { status: "shipped", date: { $gte: ISODate("2024-01-01") } } }, { $group: { _id: "$customerId", total: { $sum: "$amount" } } }, { $sort: { total: -1 } }, ]) .explain("executionStats");
// Optimize with compound indexdb.orders.createIndex({ status: 1, date: 1 }); // Supports $matchdb.orders.createIndex({ customerId: 1 }); // Supports $group
Memory Management
// Use allowDiskUse for large datasetsdb.largeCollection.aggregate( [ // ... complex pipeline stages ], { allowDiskUse: true, maxTimeMS: 300000, // 5 minute timeout });
// Limit working set sizedb.collection.aggregate([ { $match: { /* early filtering */ }, }, { $project: { /* only needed fields */ }, }, { $limit: 1000 }, // Limit early if possible // ... other stages]);
Pipeline Optimization Strategies
// Bad: Expensive operations earlydb.orders.aggregate([ { $lookup: { /* expensive join */ }, }, { $match: { status: "active" } }, // Filter after expensive join { $group: { /* grouping logic */ }, },]);
// Good: Filter early, minimize datadb.orders.aggregate([ { $match: { status: "active" } }, // Filter first { $project: { customerId: 1, amount: 1, date: 1, // Only needed fields }, }, { $lookup: { /* join with less data */ }, }, { $group: { /* grouping logic */ }, },]);
Real-World Use Cases
E-commerce Analytics Dashboard
db.orders.aggregate([ { $match: { date: { $gte: ISODate("2024-01-01"), $lt: ISODate("2024-12-31"), }, }, }, { $facet: { // Revenue metrics revenueMetrics: [ { $group: { _id: null, totalRevenue: { $sum: "$totalAmount" }, avgOrderValue: { $avg: "$totalAmount" }, orderCount: { $sum: 1 }, }, }, ],
// Monthly trends monthlyTrends: [ { $group: { _id: { year: { $year: "$date" }, month: { $month: "$date" }, }, revenue: { $sum: "$totalAmount" }, orders: { $sum: 1 }, }, }, { $sort: { "_id.year": 1, "_id.month": 1 } }, ],
// Top products topProducts: [ { $unwind: "$items" }, { $group: { _id: "$items.productId", totalSold: { $sum: "$items.quantity" }, revenue: { $sum: { $multiply: ["$items.quantity", "$items.price"] }, }, }, }, { $sort: { revenue: -1 } }, { $limit: 10 }, { $lookup: { from: "products", localField: "_id", foreignField: "_id", as: "product", }, }, ],
// Customer segments customerSegments: [ { $group: { _id: "$customerId", totalSpent: { $sum: "$totalAmount" }, orderCount: { $sum: 1 }, }, }, { $bucket: { groupBy: "$totalSpent", boundaries: [0, 100, 500, 1000, 5000], default: "premium", output: { customers: { $sum: 1 }, avgSpent: { $avg: "$totalSpent" }, }, }, }, ], }, },]);
Social Media Analytics
// Analyze user engagement patternsdb.posts.aggregate([ { $match: { createdAt: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) }, }, }, { $lookup: { from: "interactions", localField: "_id", foreignField: "postId", as: "interactions", }, }, { $addFields: { engagementScore: { $add: [ { $multiply: [{ $size: "$interactions.likes" }, 1] }, { $multiply: [{ $size: "$interactions.shares" }, 3] }, { $multiply: [{ $size: "$interactions.comments" }, 2] }, ], }, hourPosted: { $hour: "$createdAt" }, dayOfWeek: { $dayOfWeek: "$createdAt" }, }, }, { $facet: { bestPostingTimes: [ { $group: { _id: { hour: "$hourPosted", day: "$dayOfWeek", }, avgEngagement: { $avg: "$engagementScore" }, postCount: { $sum: 1 }, }, }, { $sort: { avgEngagement: -1 } }, ],
topPerformingPosts: [ { $sort: { engagementScore: -1 } }, { $limit: 20 }, { $project: { content: { $substr: ["$text", 0, 100] }, engagementScore: 1, type: 1, hashtags: 1, }, }, ],
hashtagAnalysis: [ { $unwind: "$hashtags" }, { $group: { _id: "$hashtags", usage: { $sum: 1 }, avgEngagement: { $avg: "$engagementScore" }, }, }, { $sort: { avgEngagement: -1 } }, ], }, },]);
Troubleshooting Common Issues
Memory Limitations
// Error: Aggregation pipeline exceeds memory limit// Solution: Use allowDiskUse and optimize pipeline
// Before: Memory intensivedb.collection.aggregate([ { $group: { _id: "$field", data: { $push: "$$ROOT" } } },]);
// After: Memory efficientdb.collection.aggregate([{ $group: { _id: "$field", count: { $sum: 1 } } }], { allowDiskUse: true,});
Performance Bottlenecks
// Identify slow stages with explaindb.collection .aggregate([ // ... pipeline stages ]) .explain("executionStats");
// Look for:// - High "executionTimeMillisEstimate"// - "totalDocsExamined" vs "totalDocsReturned" ratio// - Missing index usage in $match stages
Data Type Issues
// Handle mixed data types safelydb.collection.aggregate([ { $addFields: { numericField: { $cond: { if: { $type: "$maybeNumber" }, then: { $cond: { if: { $eq: [{ $type: "$maybeNumber" }, "string"] }, then: { $toDouble: "$maybeNumber" }, else: "$maybeNumber", }, }, else: 0, }, }, }, },]);
Best Practices Summary
- Design for performance - Place filtering stages early, limit data flow
- Use indexes effectively - Ensure $match and $sort can leverage indexes
- Monitor memory usage - Use allowDiskUse for large datasets
- Optimize $lookup operations - Index join fields and use pipelines when needed
- Test with realistic data - Validate performance with production-sized datasets
- Use explain() regularly - Monitor and optimize pipeline performance
- Consider alternatives - Sometimes multiple simple queries outperform complex pipelines
Conclusion
Advanced aggregation pipelines are powerful tools for complex data analysis in MongoDB. By understanding the various stages, optimization techniques, and real-world patterns, you can build efficient and scalable data processing solutions.
Remember to always consider performance implications, use appropriate indexes, and test your pipelines with realistic datasets. The aggregation framework’s flexibility allows for sophisticated data transformations, but with great power comes the responsibility to use it wisely.