Skip to content

Streams

What are Node.js Streams?

Streams are one of Node.js’s most powerful features for handling data flow efficiently. Think of streams as conveyor belts in a factory - they process data piece by piece rather than waiting for the entire dataset to be available. This approach makes Node.js applications highly memory-efficient and responsive.

The Stream Advantage

Without Streams (Traditional Approach):

const fs = require("fs");
// ❌ Memory intensive - loads entire file into memory
const data = fs.readFileSync("10GB-file.txt");
console.log("File size:", data.length); // Waits for entire file to load

With Streams (Efficient Approach):

const fs = require("fs");
// ✅ Memory efficient - processes data in chunks
const stream = fs.createReadStream("10GB-file.txt");
let totalSize = 0;
stream.on("data", (chunk) => {
totalSize += chunk.length;
console.log(`Processed: ${chunk.length} bytes`); // Real-time processing
});
stream.on("end", () => {
console.log("Total file size:", totalSize);
});

Key Benefits

  • Memory Efficiency: Process large files with minimal RAM usage
  • Performance: Start processing data before it’s fully available
  • Scalability: Handle thousands of concurrent operations
  • Composability: Chain operations together like UNIX pipes

Types of Streams

Node.js provides four fundamental stream types:

1. Readable Streams

Readable streams are data producers - they generate or provide data that can be consumed.

Common Examples:

  • File reading (fs.createReadStream())
  • HTTP request bodies (req in Express)
  • Process input (process.stdin)
  • Database query results
const fs = require("fs");
const readableStream = fs.createReadStream("data.txt", {
encoding: "utf8",
highWaterMark: 16 * 1024, // 16KB chunks
});
readableStream.on("data", (chunk) => {
console.log("📥 Received chunk:", chunk.length, "bytes");
});
readableStream.on("end", () => {
console.log("✅ Reading complete");
});
readableStream.on("error", (error) => {
console.error("❌ Read error:", error);
});

2. Writable Streams

Writable streams are data consumers - they accept and process incoming data.

Common Examples:

  • File writing (fs.createWriteStream())
  • HTTP response bodies (res in Express)
  • Process output (process.stdout)
  • Database write operations
const fs = require("fs");
const writableStream = fs.createWriteStream("output.txt");
// Writing data
writableStream.write("Hello ");
writableStream.write("World!\n");
writableStream.write("Node.js Streams are powerful!");
// Finish writing
writableStream.end();
writableStream.on("finish", () => {
console.log("✅ Writing complete");
});
writableStream.on("error", (error) => {
console.error("❌ Write error:", error);
});

3. Duplex Streams

Duplex streams can both read and write data independently - like a telephone connection where both parties can speak and listen simultaneously.

Common Examples:

  • TCP sockets (net.Socket)
  • WebSocket connections
  • Child process stdio
  • Crypto cipher/decipher streams
const net = require("net");
// Create a TCP server with duplex streams
const server = net.createServer((socket) => {
console.log("🔗 Client connected");
// Write welcome message
socket.write("Welcome to the echo server!\n");
// Read incoming data and echo back
socket.on("data", (data) => {
console.log("📨 Received:", data.toString().trim());
socket.write(`Echo: ${data}`); // Write back to same stream
});
socket.on("end", () => {
console.log("👋 Client disconnected");
});
});
server.listen(8080, () => {
console.log("🚀 Echo server running on port 8080");
});

4. Transform Streams

Transform streams are special duplex streams that modify data as it passes through - like a filter or processor in a pipeline.

Common Examples:

  • Data compression (zlib.createGzip())
  • Encryption (crypto.createCipher())
  • Data parsing (JSON, CSV transformers)
  • Custom data processors
const { Transform } = require("stream");
const fs = require("fs");
// Custom transform stream
class JSONFormatter extends Transform {
constructor() {
super({ objectMode: true }); // Handle objects instead of buffers
}
_transform(chunk, encoding, callback) {
try {
const data = JSON.parse(chunk.toString());
const formatted = {
...data,
timestamp: new Date().toISOString(),
processed: true,
};
callback(null, JSON.stringify(formatted, null, 2) + "\n");
} catch (error) {
callback(error);
}
}
}
// Usage example
const input = fs.createReadStream("raw-data.json");
const formatter = new JSONFormatter();
const output = fs.createWriteStream("formatted-data.json");
input.pipe(formatter).pipe(output);

Stream Methods and Events

Essential Methods

Readable Stream Methods

const stream = fs.createReadStream("file.txt");
// Manual reading
const chunk = stream.read(1024); // Read up to 1KB
// Flow control
stream.pause(); // Pause data flow
stream.resume(); // Resume data flow
// Cleanup
stream.destroy(); // Destroy the stream

Writable Stream Methods

const stream = fs.createWriteStream("file.txt");
// Writing data
const success = stream.write("data"); // Returns false if backpressure
// Buffering control
stream.cork(); // Buffer writes
stream.uncork(); // Flush buffered writes
// Finishing
stream.end("final data"); // End with optional final chunk

Piping (Universal)

readableStream.pipe(writableStream); // Connect streams
readableStream.unpipe(writableStream); // Disconnect streams

Critical Events

For All Streams

  • 'error': Emitted when an error occurs
  • 'close': Stream has been closed

Readable Stream Events

  • 'data': New data is available
  • 'end': No more data will be provided
  • 'readable': Data is available to be read

Writable Stream Events

  • 'drain': Safe to write again after backpressure
  • 'finish': All data has been flushed
  • 'pipe': A readable stream has been piped to this writable
const fs = require("fs");
const writeStream = fs.createWriteStream("output.txt");
// Handle backpressure properly
function writeWithBackpressure(stream, data) {
return new Promise((resolve, reject) => {
const success = stream.write(data);
if (success) {
// No backpressure - can continue immediately
resolve();
} else {
// Backpressure occurred - wait for drain event
stream.once("drain", resolve);
stream.once("error", reject);
}
});
}
// Usage
async function writeLotsOfData() {
for (let i = 0; i < 100000; i++) {
await writeWithBackpressure(writeStream, `Line ${i}\n`);
}
writeStream.end();
}

Piping and Pipeline Patterns

Basic Piping

Piping connects streams together, automatically handling data flow and backpressure:

const fs = require("fs");
const zlib = require("zlib");
// Simple pipe chain
fs.createReadStream("input.txt")
.pipe(zlib.createGzip()) // Compress
.pipe(fs.createWriteStream("output.gz")); // Save compressed file
console.log("🔄 Compression started...");

Advanced Pipeline with Error Handling

const { pipeline } = require("stream");
const fs = require("fs");
const zlib = require("zlib");
const crypto = require("crypto");
// Robust pipeline with automatic error handling and cleanup
pipeline(
fs.createReadStream("sensitive-data.txt"),
zlib.createGzip(), // Compress
crypto.createCipher("aes-256-cbc", "secret-key"), // Encrypt
fs.createWriteStream("secure-backup.gz"),
(error) => {
if (error) {
console.error("❌ Pipeline failed:", error);
} else {
console.log("✅ Data compressed, encrypted, and saved");
}
}
);

Custom Transform Chains

const { Transform, pipeline } = require("stream");
const fs = require("fs");
// CSV to JSON transformer
class CSVToJSON extends Transform {
constructor() {
super({ objectMode: true });
this.headers = null;
}
_transform(chunk, encoding, callback) {
const lines = chunk.toString().split("\n");
for (const line of lines) {
if (!line.trim()) continue;
const values = line.split(",");
if (!this.headers) {
this.headers = values;
} else {
const obj = {};
this.headers.forEach((header, index) => {
obj[header.trim()] = values[index]?.trim();
});
this.push(JSON.stringify(obj) + "\n");
}
}
callback();
}
}
// Data validator transformer
class DataValidator extends Transform {
_transform(chunk, encoding, callback) {
try {
const data = JSON.parse(chunk.toString());
// Validation logic
if (data.email && data.email.includes("@")) {
this.push(chunk); // Valid data
}
// Invalid data is filtered out
} catch (error) {
// Skip malformed JSON
}
callback();
}
}
// Complete processing pipeline
pipeline(
fs.createReadStream("users.csv"),
new CSVToJSON(),
new DataValidator(),
fs.createWriteStream("valid-users.json"),
(error) => {
if (error) {
console.error("Processing failed:", error);
} else {
console.log("✅ CSV converted and validated");
}
}
);

Real-World Applications

1. High-Performance File Upload Handler

const express = require("express");
const multer = require("multer");
const fs = require("fs");
const path = require("path");
const crypto = require("crypto");
const app = express();
app.post("/upload", (req, res) => {
const uploadDir = "uploads";
const filename = `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
const filepath = path.join(uploadDir, filename);
// Ensure upload directory exists
if (!fs.existsSync(uploadDir)) {
fs.mkdirSync(uploadDir, { recursive: true });
}
// Create write stream and hash calculator
const writeStream = fs.createWriteStream(filepath);
const hash = crypto.createHash("sha256");
let uploadSize = 0;
req.on("data", (chunk) => {
uploadSize += chunk.length;
hash.update(chunk);
writeStream.write(chunk);
});
req.on("end", () => {
writeStream.end();
const fileHash = hash.digest("hex");
res.json({
success: true,
filename,
size: uploadSize,
hash: fileHash,
message: "Upload completed successfully",
});
});
req.on("error", (error) => {
writeStream.destroy();
fs.unlink(filepath, () => {}); // Clean up partial file
res.status(500).json({ error: "Upload failed" });
});
});
app.listen(3000, () => {
console.log("🚀 Upload server running on port 3000");
});

2. Real-Time Log Processing System

const fs = require("fs");
const { Transform } = require("stream");
const EventEmitter = require("events");
class LogAnalyzer extends Transform {
constructor() {
super({ objectMode: true });
this.stats = {
total: 0,
errors: 0,
warnings: 0,
info: 0,
};
}
_transform(line, encoding, callback) {
const logEntry = this.parseLogLine(line.toString());
if (logEntry) {
this.stats.total++;
this.stats[logEntry.level.toLowerCase()]++;
// Emit alerts for critical errors
if (logEntry.level === "ERROR" && logEntry.message.includes("CRITICAL")) {
this.emit("critical-error", logEntry);
}
this.push(logEntry);
}
callback();
}
parseLogLine(line) {
const match = line.match(
/\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\] (\w+): (.*)/
);
if (match) {
return {
timestamp: match[1],
level: match[2],
message: match[3],
processedAt: new Date().toISOString(),
};
}
return null;
}
getStats() {
return { ...this.stats };
}
}
// Usage
const logAnalyzer = new LogAnalyzer();
// Set up alerting
logAnalyzer.on("critical-error", (errorLog) => {
console.log("🚨 CRITICAL ERROR DETECTED:", errorLog.message);
// Send to monitoring system, email alerts, etc.
});
// Process log file
fs.createReadStream("application.log")
.pipe(logAnalyzer)
.on("data", (logEntry) => {
// Real-time log processing
if (logEntry.level === "ERROR") {
console.log("🔴 Error:", logEntry.message);
}
})
.on("end", () => {
console.log("📊 Log Analysis Complete:");
console.log(logAnalyzer.getStats());
});

3. Data Export Pipeline

const { Readable, Transform, pipeline } = require("stream");
const fs = require("fs");
// Mock database stream
class DatabaseStream extends Readable {
constructor(options) {
super({ objectMode: true });
this.currentId = 1;
this.maxRecords = options.maxRecords || 1000;
this.batchSize = options.batchSize || 100;
}
_read() {
if (this.currentId > this.maxRecords) {
this.push(null); // End stream
return;
}
// Simulate database batch fetch
setTimeout(() => {
const batch = [];
const endId = Math.min(this.currentId + this.batchSize, this.maxRecords);
for (let id = this.currentId; id < endId; id++) {
batch.push({
id,
name: `User ${id}`,
email: `user${id}@example.com`,
createdAt: new Date().toISOString(),
});
}
this.currentId = endId;
this.push(batch);
}, 10); // Simulate network delay
}
}
// CSV formatter transform
class CSVFormatter extends Transform {
constructor() {
super({ objectMode: true });
this.headerWritten = false;
}
_transform(batch, encoding, callback) {
let output = "";
if (!this.headerWritten && batch.length > 0) {
const headers = Object.keys(batch[0]).join(",");
output += headers + "\n";
this.headerWritten = true;
}
for (const record of batch) {
const values = Object.values(record).map((value) =>
typeof value === "string" && value.includes(",") ? `"${value}"` : value
);
output += values.join(",") + "\n";
}
callback(null, output);
}
}
// Export pipeline
const dbStream = new DatabaseStream({ maxRecords: 10000, batchSize: 500 });
const csvFormatter = new CSVFormatter();
const outputFile = fs.createWriteStream("users_export.csv");
console.log("🚀 Starting data export...");
pipeline(dbStream, csvFormatter, outputFile, (error) => {
if (error) {
console.error("❌ Export failed:", error);
} else {
console.log("✅ Export completed successfully");
}
});

Performance Best Practices

1. Memory Management

// ✅ Good: Configure appropriate buffer sizes
const readStream = fs.createReadStream("large-file.txt", {
highWaterMark: 64 * 1024, // 64KB chunks (good for most use cases)
});
const writeStream = fs.createWriteStream("output.txt", {
highWaterMark: 16 * 1024, // 16KB buffer
});
// ✅ Good: Monitor memory usage
setInterval(() => {
const memUsage = process.memoryUsage();
console.log(`Memory: ${Math.round(memUsage.heapUsed / 1024 / 1024)}MB`);
}, 5000);

2. Error Handling Patterns

const { pipeline } = require("stream");
// ✅ Good: Always use pipeline for complex chains
pipeline(
sourceStream,
transformStream1,
transformStream2,
destinationStream,
(error) => {
if (error) {
console.error("Pipeline error:", error);
// Cleanup logic here
} else {
console.log("Pipeline completed successfully");
}
}
);
// ❌ Bad: Manual piping without proper error handling
sourceStream
.pipe(transformStream1)
.pipe(transformStream2)
.pipe(destinationStream); // Errors might not be handled properly

3. Backpressure Management

const fs = require("fs");
function writeWithProperBackpressure(stream, data) {
return new Promise((resolve, reject) => {
if (!stream.write(data)) {
// Backpressure - wait for drain
stream.once("drain", resolve);
stream.once("error", reject);
} else {
// No backpressure - continue
process.nextTick(resolve);
}
});
}
async function processLargeDataset(data) {
const writeStream = fs.createWriteStream("output.txt");
for (const item of data) {
await writeWithProperBackpressure(writeStream, JSON.stringify(item) + "\n");
}
writeStream.end();
}

4. Performance Monitoring

const { Transform } = require("stream");
class PerformanceMonitor extends Transform {
constructor(options = {}) {
super(options);
this.bytesProcessed = 0;
this.itemsProcessed = 0;
this.startTime = Date.now();
this.lastReport = this.startTime;
}
_transform(chunk, encoding, callback) {
this.bytesProcessed += chunk.length;
this.itemsProcessed++;
// Report every 10MB or 10 seconds
const now = Date.now();
if (
this.bytesProcessed % (10 * 1024 * 1024) === 0 ||
now - this.lastReport > 10000
) {
this.reportPerformance();
this.lastReport = now;
}
callback(null, chunk);
}
reportPerformance() {
const elapsed = (Date.now() - this.startTime) / 1000;
const mbProcessed = this.bytesProcessed / (1024 * 1024);
const throughput = mbProcessed / elapsed;
console.log(`📊 Performance Report:`);
console.log(` Processed: ${mbProcessed.toFixed(2)} MB`);
console.log(` Items: ${this.itemsProcessed}`);
console.log(` Throughput: ${throughput.toFixed(2)} MB/s`);
console.log(` Duration: ${elapsed.toFixed(2)}s`);
}
}
// Usage
inputStream
.pipe(new PerformanceMonitor())
.pipe(processingStream)
.pipe(outputStream);

Common Patterns and Use Cases

File Processing

const fs = require("fs");
const readline = require("readline");
// Process large text files line by line
async function processLargeTextFile(filename) {
const fileStream = fs.createReadStream(filename);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity,
});
let lineCount = 0;
for await (const line of rl) {
lineCount++;
// Process each line
console.log(`Line ${lineCount}: ${line.substring(0, 50)}...`);
}
console.log(`✅ Processed ${lineCount} lines`);
}

Network Streaming

const https = require("https");
const fs = require("fs");
function downloadFile(url, destination) {
return new Promise((resolve, reject) => {
const file = fs.createWriteStream(destination);
https
.get(url, (response) => {
const totalSize = parseInt(response.headers["content-length"], 10);
let downloadedSize = 0;
response.on("data", (chunk) => {
downloadedSize += chunk.length;
const progress = ((downloadedSize / totalSize) * 100).toFixed(1);
process.stdout.write(`\rDownloading... ${progress}%`);
});
response.pipe(file);
file.on("finish", () => {
file.close();
console.log("\n✅ Download completed");
resolve();
});
file.on("error", (error) => {
fs.unlink(destination, () => {});
reject(error);
});
})
.on("error", reject);
});
}

Data Validation Pipeline

const { Transform } = require("stream");
class DataValidator extends Transform {
constructor(schema) {
super({ objectMode: true });
this.schema = schema;
this.validCount = 0;
this.invalidCount = 0;
}
_transform(data, encoding, callback) {
if (this.validateData(data)) {
this.validCount++;
this.push(data);
} else {
this.invalidCount++;
console.log("❌ Invalid data:", data);
}
callback();
}
validateData(data) {
// Simple validation example
return (
data.email &&
data.email.includes("@") &&
data.name &&
data.name.length > 0
);
}
_flush(callback) {
console.log(`\n📊 Validation Summary:`);
console.log(` Valid records: ${this.validCount}`);
console.log(` Invalid records: ${this.invalidCount}`);
callback();
}
}
// Usage
inputStream.pipe(new DataValidator()).pipe(outputStream);

Conclusion

Node.js streams are essential for building scalable, efficient applications. They enable you to:

  • Handle large datasets without overwhelming system memory
  • Build modular pipelines that are easy to test and maintain
  • Process data in real-time as it becomes available
  • Create responsive applications that don’t block the event loop

Master these stream patterns, and you’ll be able to build Node.js applications that can handle enterprise-scale data processing with elegance and efficiency.

Key Takeaways:

  • Always use pipeline() for complex stream chains
  • Handle errors properly to prevent crashes
  • Respect backpressure to avoid memory issues
  • Monitor performance in production applications
  • Use appropriate buffer sizes for your use case