Streams
What are Node.js Streams?
Streams are one of Node.js’s most powerful features for handling data flow efficiently. Think of streams as conveyor belts in a factory - they process data piece by piece rather than waiting for the entire dataset to be available. This approach makes Node.js applications highly memory-efficient and responsive.
The Stream Advantage
Without Streams (Traditional Approach):
const fs = require("fs");
// ❌ Memory intensive - loads entire file into memoryconst data = fs.readFileSync("10GB-file.txt");console.log("File size:", data.length); // Waits for entire file to load
With Streams (Efficient Approach):
const fs = require("fs");
// ✅ Memory efficient - processes data in chunksconst stream = fs.createReadStream("10GB-file.txt");let totalSize = 0;
stream.on("data", (chunk) => { totalSize += chunk.length; console.log(`Processed: ${chunk.length} bytes`); // Real-time processing});
stream.on("end", () => { console.log("Total file size:", totalSize);});
Key Benefits
- Memory Efficiency: Process large files with minimal RAM usage
- Performance: Start processing data before it’s fully available
- Scalability: Handle thousands of concurrent operations
- Composability: Chain operations together like UNIX pipes
Types of Streams
Node.js provides four fundamental stream types:
1. Readable Streams
Readable streams are data producers - they generate or provide data that can be consumed.
Common Examples:
- File reading (
fs.createReadStream()
) - HTTP request bodies (
req
in Express) - Process input (
process.stdin
) - Database query results
const fs = require("fs");
const readableStream = fs.createReadStream("data.txt", { encoding: "utf8", highWaterMark: 16 * 1024, // 16KB chunks});
readableStream.on("data", (chunk) => { console.log("📥 Received chunk:", chunk.length, "bytes");});
readableStream.on("end", () => { console.log("✅ Reading complete");});
readableStream.on("error", (error) => { console.error("❌ Read error:", error);});
2. Writable Streams
Writable streams are data consumers - they accept and process incoming data.
Common Examples:
- File writing (
fs.createWriteStream()
) - HTTP response bodies (
res
in Express) - Process output (
process.stdout
) - Database write operations
const fs = require("fs");
const writableStream = fs.createWriteStream("output.txt");
// Writing datawritableStream.write("Hello ");writableStream.write("World!\n");writableStream.write("Node.js Streams are powerful!");
// Finish writingwritableStream.end();
writableStream.on("finish", () => { console.log("✅ Writing complete");});
writableStream.on("error", (error) => { console.error("❌ Write error:", error);});
3. Duplex Streams
Duplex streams can both read and write data independently - like a telephone connection where both parties can speak and listen simultaneously.
Common Examples:
- TCP sockets (
net.Socket
) - WebSocket connections
- Child process stdio
- Crypto cipher/decipher streams
const net = require("net");
// Create a TCP server with duplex streamsconst server = net.createServer((socket) => { console.log("🔗 Client connected");
// Write welcome message socket.write("Welcome to the echo server!\n");
// Read incoming data and echo back socket.on("data", (data) => { console.log("📨 Received:", data.toString().trim()); socket.write(`Echo: ${data}`); // Write back to same stream });
socket.on("end", () => { console.log("👋 Client disconnected"); });});
server.listen(8080, () => { console.log("🚀 Echo server running on port 8080");});
4. Transform Streams
Transform streams are special duplex streams that modify data as it passes through - like a filter or processor in a pipeline.
Common Examples:
- Data compression (
zlib.createGzip()
) - Encryption (
crypto.createCipher()
) - Data parsing (JSON, CSV transformers)
- Custom data processors
const { Transform } = require("stream");const fs = require("fs");
// Custom transform streamclass JSONFormatter extends Transform { constructor() { super({ objectMode: true }); // Handle objects instead of buffers }
_transform(chunk, encoding, callback) { try { const data = JSON.parse(chunk.toString()); const formatted = { ...data, timestamp: new Date().toISOString(), processed: true, };
callback(null, JSON.stringify(formatted, null, 2) + "\n"); } catch (error) { callback(error); } }}
// Usage exampleconst input = fs.createReadStream("raw-data.json");const formatter = new JSONFormatter();const output = fs.createWriteStream("formatted-data.json");
input.pipe(formatter).pipe(output);
Stream Methods and Events
Essential Methods
Readable Stream Methods
const stream = fs.createReadStream("file.txt");
// Manual readingconst chunk = stream.read(1024); // Read up to 1KB
// Flow controlstream.pause(); // Pause data flowstream.resume(); // Resume data flow
// Cleanupstream.destroy(); // Destroy the stream
Writable Stream Methods
const stream = fs.createWriteStream("file.txt");
// Writing dataconst success = stream.write("data"); // Returns false if backpressure
// Buffering controlstream.cork(); // Buffer writesstream.uncork(); // Flush buffered writes
// Finishingstream.end("final data"); // End with optional final chunk
Piping (Universal)
readableStream.pipe(writableStream); // Connect streamsreadableStream.unpipe(writableStream); // Disconnect streams
Critical Events
For All Streams
'error'
: Emitted when an error occurs'close'
: Stream has been closed
Readable Stream Events
'data'
: New data is available'end'
: No more data will be provided'readable'
: Data is available to be read
Writable Stream Events
'drain'
: Safe to write again after backpressure'finish'
: All data has been flushed'pipe'
: A readable stream has been piped to this writable
const fs = require("fs");
const writeStream = fs.createWriteStream("output.txt");
// Handle backpressure properlyfunction writeWithBackpressure(stream, data) { return new Promise((resolve, reject) => { const success = stream.write(data);
if (success) { // No backpressure - can continue immediately resolve(); } else { // Backpressure occurred - wait for drain event stream.once("drain", resolve); stream.once("error", reject); } });}
// Usageasync function writeLotsOfData() { for (let i = 0; i < 100000; i++) { await writeWithBackpressure(writeStream, `Line ${i}\n`); } writeStream.end();}
Piping and Pipeline Patterns
Basic Piping
Piping connects streams together, automatically handling data flow and backpressure:
const fs = require("fs");const zlib = require("zlib");
// Simple pipe chainfs.createReadStream("input.txt") .pipe(zlib.createGzip()) // Compress .pipe(fs.createWriteStream("output.gz")); // Save compressed file
console.log("🔄 Compression started...");
Advanced Pipeline with Error Handling
const { pipeline } = require("stream");const fs = require("fs");const zlib = require("zlib");const crypto = require("crypto");
// Robust pipeline with automatic error handling and cleanuppipeline( fs.createReadStream("sensitive-data.txt"), zlib.createGzip(), // Compress crypto.createCipher("aes-256-cbc", "secret-key"), // Encrypt fs.createWriteStream("secure-backup.gz"), (error) => { if (error) { console.error("❌ Pipeline failed:", error); } else { console.log("✅ Data compressed, encrypted, and saved"); } });
Custom Transform Chains
const { Transform, pipeline } = require("stream");const fs = require("fs");
// CSV to JSON transformerclass CSVToJSON extends Transform { constructor() { super({ objectMode: true }); this.headers = null; }
_transform(chunk, encoding, callback) { const lines = chunk.toString().split("\n");
for (const line of lines) { if (!line.trim()) continue;
const values = line.split(",");
if (!this.headers) { this.headers = values; } else { const obj = {}; this.headers.forEach((header, index) => { obj[header.trim()] = values[index]?.trim(); }); this.push(JSON.stringify(obj) + "\n"); } }
callback(); }}
// Data validator transformerclass DataValidator extends Transform { _transform(chunk, encoding, callback) { try { const data = JSON.parse(chunk.toString());
// Validation logic if (data.email && data.email.includes("@")) { this.push(chunk); // Valid data } // Invalid data is filtered out } catch (error) { // Skip malformed JSON }
callback(); }}
// Complete processing pipelinepipeline( fs.createReadStream("users.csv"), new CSVToJSON(), new DataValidator(), fs.createWriteStream("valid-users.json"), (error) => { if (error) { console.error("Processing failed:", error); } else { console.log("✅ CSV converted and validated"); } });
Real-World Applications
1. High-Performance File Upload Handler
const express = require("express");const multer = require("multer");const fs = require("fs");const path = require("path");const crypto = require("crypto");
const app = express();
app.post("/upload", (req, res) => { const uploadDir = "uploads"; const filename = `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; const filepath = path.join(uploadDir, filename);
// Ensure upload directory exists if (!fs.existsSync(uploadDir)) { fs.mkdirSync(uploadDir, { recursive: true }); }
// Create write stream and hash calculator const writeStream = fs.createWriteStream(filepath); const hash = crypto.createHash("sha256");
let uploadSize = 0;
req.on("data", (chunk) => { uploadSize += chunk.length; hash.update(chunk); writeStream.write(chunk); });
req.on("end", () => { writeStream.end();
const fileHash = hash.digest("hex");
res.json({ success: true, filename, size: uploadSize, hash: fileHash, message: "Upload completed successfully", }); });
req.on("error", (error) => { writeStream.destroy(); fs.unlink(filepath, () => {}); // Clean up partial file res.status(500).json({ error: "Upload failed" }); });});
app.listen(3000, () => { console.log("🚀 Upload server running on port 3000");});
2. Real-Time Log Processing System
const fs = require("fs");const { Transform } = require("stream");const EventEmitter = require("events");
class LogAnalyzer extends Transform { constructor() { super({ objectMode: true }); this.stats = { total: 0, errors: 0, warnings: 0, info: 0, }; }
_transform(line, encoding, callback) { const logEntry = this.parseLogLine(line.toString());
if (logEntry) { this.stats.total++; this.stats[logEntry.level.toLowerCase()]++;
// Emit alerts for critical errors if (logEntry.level === "ERROR" && logEntry.message.includes("CRITICAL")) { this.emit("critical-error", logEntry); }
this.push(logEntry); }
callback(); }
parseLogLine(line) { const match = line.match( /\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\] (\w+): (.*)/ );
if (match) { return { timestamp: match[1], level: match[2], message: match[3], processedAt: new Date().toISOString(), }; }
return null; }
getStats() { return { ...this.stats }; }}
// Usageconst logAnalyzer = new LogAnalyzer();
// Set up alertinglogAnalyzer.on("critical-error", (errorLog) => { console.log("🚨 CRITICAL ERROR DETECTED:", errorLog.message); // Send to monitoring system, email alerts, etc.});
// Process log filefs.createReadStream("application.log") .pipe(logAnalyzer) .on("data", (logEntry) => { // Real-time log processing if (logEntry.level === "ERROR") { console.log("🔴 Error:", logEntry.message); } }) .on("end", () => { console.log("📊 Log Analysis Complete:"); console.log(logAnalyzer.getStats()); });
3. Data Export Pipeline
const { Readable, Transform, pipeline } = require("stream");const fs = require("fs");
// Mock database streamclass DatabaseStream extends Readable { constructor(options) { super({ objectMode: true }); this.currentId = 1; this.maxRecords = options.maxRecords || 1000; this.batchSize = options.batchSize || 100; }
_read() { if (this.currentId > this.maxRecords) { this.push(null); // End stream return; }
// Simulate database batch fetch setTimeout(() => { const batch = []; const endId = Math.min(this.currentId + this.batchSize, this.maxRecords);
for (let id = this.currentId; id < endId; id++) { batch.push({ id, name: `User ${id}`, email: `user${id}@example.com`, createdAt: new Date().toISOString(), }); }
this.currentId = endId; this.push(batch); }, 10); // Simulate network delay }}
// CSV formatter transformclass CSVFormatter extends Transform { constructor() { super({ objectMode: true }); this.headerWritten = false; }
_transform(batch, encoding, callback) { let output = "";
if (!this.headerWritten && batch.length > 0) { const headers = Object.keys(batch[0]).join(","); output += headers + "\n"; this.headerWritten = true; }
for (const record of batch) { const values = Object.values(record).map((value) => typeof value === "string" && value.includes(",") ? `"${value}"` : value ); output += values.join(",") + "\n"; }
callback(null, output); }}
// Export pipelineconst dbStream = new DatabaseStream({ maxRecords: 10000, batchSize: 500 });const csvFormatter = new CSVFormatter();const outputFile = fs.createWriteStream("users_export.csv");
console.log("🚀 Starting data export...");
pipeline(dbStream, csvFormatter, outputFile, (error) => { if (error) { console.error("❌ Export failed:", error); } else { console.log("✅ Export completed successfully"); }});
Performance Best Practices
1. Memory Management
// ✅ Good: Configure appropriate buffer sizesconst readStream = fs.createReadStream("large-file.txt", { highWaterMark: 64 * 1024, // 64KB chunks (good for most use cases)});
const writeStream = fs.createWriteStream("output.txt", { highWaterMark: 16 * 1024, // 16KB buffer});
// ✅ Good: Monitor memory usagesetInterval(() => { const memUsage = process.memoryUsage(); console.log(`Memory: ${Math.round(memUsage.heapUsed / 1024 / 1024)}MB`);}, 5000);
2. Error Handling Patterns
const { pipeline } = require("stream");
// ✅ Good: Always use pipeline for complex chainspipeline( sourceStream, transformStream1, transformStream2, destinationStream, (error) => { if (error) { console.error("Pipeline error:", error); // Cleanup logic here } else { console.log("Pipeline completed successfully"); } });
// ❌ Bad: Manual piping without proper error handlingsourceStream .pipe(transformStream1) .pipe(transformStream2) .pipe(destinationStream); // Errors might not be handled properly
3. Backpressure Management
const fs = require("fs");
function writeWithProperBackpressure(stream, data) { return new Promise((resolve, reject) => { if (!stream.write(data)) { // Backpressure - wait for drain stream.once("drain", resolve); stream.once("error", reject); } else { // No backpressure - continue process.nextTick(resolve); } });}
async function processLargeDataset(data) { const writeStream = fs.createWriteStream("output.txt");
for (const item of data) { await writeWithProperBackpressure(writeStream, JSON.stringify(item) + "\n"); }
writeStream.end();}
4. Performance Monitoring
const { Transform } = require("stream");
class PerformanceMonitor extends Transform { constructor(options = {}) { super(options); this.bytesProcessed = 0; this.itemsProcessed = 0; this.startTime = Date.now(); this.lastReport = this.startTime; }
_transform(chunk, encoding, callback) { this.bytesProcessed += chunk.length; this.itemsProcessed++;
// Report every 10MB or 10 seconds const now = Date.now(); if ( this.bytesProcessed % (10 * 1024 * 1024) === 0 || now - this.lastReport > 10000 ) { this.reportPerformance(); this.lastReport = now; }
callback(null, chunk); }
reportPerformance() { const elapsed = (Date.now() - this.startTime) / 1000; const mbProcessed = this.bytesProcessed / (1024 * 1024); const throughput = mbProcessed / elapsed;
console.log(`📊 Performance Report:`); console.log(` Processed: ${mbProcessed.toFixed(2)} MB`); console.log(` Items: ${this.itemsProcessed}`); console.log(` Throughput: ${throughput.toFixed(2)} MB/s`); console.log(` Duration: ${elapsed.toFixed(2)}s`); }}
// UsageinputStream .pipe(new PerformanceMonitor()) .pipe(processingStream) .pipe(outputStream);
Common Patterns and Use Cases
File Processing
const fs = require("fs");const readline = require("readline");
// Process large text files line by lineasync function processLargeTextFile(filename) { const fileStream = fs.createReadStream(filename); const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity, });
let lineCount = 0; for await (const line of rl) { lineCount++; // Process each line console.log(`Line ${lineCount}: ${line.substring(0, 50)}...`); }
console.log(`✅ Processed ${lineCount} lines`);}
Network Streaming
const https = require("https");const fs = require("fs");
function downloadFile(url, destination) { return new Promise((resolve, reject) => { const file = fs.createWriteStream(destination);
https .get(url, (response) => { const totalSize = parseInt(response.headers["content-length"], 10); let downloadedSize = 0;
response.on("data", (chunk) => { downloadedSize += chunk.length; const progress = ((downloadedSize / totalSize) * 100).toFixed(1); process.stdout.write(`\rDownloading... ${progress}%`); });
response.pipe(file);
file.on("finish", () => { file.close(); console.log("\n✅ Download completed"); resolve(); });
file.on("error", (error) => { fs.unlink(destination, () => {}); reject(error); }); }) .on("error", reject); });}
Data Validation Pipeline
const { Transform } = require("stream");
class DataValidator extends Transform { constructor(schema) { super({ objectMode: true }); this.schema = schema; this.validCount = 0; this.invalidCount = 0; }
_transform(data, encoding, callback) { if (this.validateData(data)) { this.validCount++; this.push(data); } else { this.invalidCount++; console.log("❌ Invalid data:", data); } callback(); }
validateData(data) { // Simple validation example return ( data.email && data.email.includes("@") && data.name && data.name.length > 0 ); }
_flush(callback) { console.log(`\n📊 Validation Summary:`); console.log(` Valid records: ${this.validCount}`); console.log(` Invalid records: ${this.invalidCount}`); callback(); }}
// UsageinputStream.pipe(new DataValidator()).pipe(outputStream);
Conclusion
Node.js streams are essential for building scalable, efficient applications. They enable you to:
- Handle large datasets without overwhelming system memory
- Build modular pipelines that are easy to test and maintain
- Process data in real-time as it becomes available
- Create responsive applications that don’t block the event loop
Master these stream patterns, and you’ll be able to build Node.js applications that can handle enterprise-scale data processing with elegance and efficiency.
Key Takeaways:
- Always use
pipeline()
for complex stream chains - Handle errors properly to prevent crashes
- Respect backpressure to avoid memory issues
- Monitor performance in production applications
- Use appropriate buffer sizes for your use case