Processing gigabytes of transactional logs without V8 heap saturation — Buffer internals, Transform stream pipelines, and zero-copy ingestion.
Module 5 — Native Streams & Off-Heap Buffer Storage
What this module covers: Processing gigabytes of blockchain transaction logs or UPI settlement files without crashing the V8 runtime requires understanding the boundary between V8-managed memory and memory that lives completely outside it. Buffer in Node.js allocates from the OS directly, bypassing V8's heap and garbage collector entirely. Custom Transform streams process that memory in chunks, keeping your heap stable regardless of input size. This module covers the precise memory model for off-heap storage, the correct implementation of Transform streams for production ingestion pipelines, and streaming pipeline composition with pipeline().
The Memory Boundary: V8 Heap vs Off-Heap
In Module 1 you learned that V8 manages its own heap — New Space for short-lived objects, Old Space for long-lived ones. All standard JavaScript objects live here: arrays, plain objects, strings, closures.
Node.js adds a second memory region: off-heap memory — memory allocated directly from the OS via malloc, completely outside V8's control. V8 has no visibility into it, no GC responsibility for it, and cannot trigger GC based on its size.
This distinction is critical for ingestion pipelines:
V8 Heap:
- Managed by V8
- GC pressure from allocations here
- Limits: --max-old-space-size (default ~1.5GB)
- Objects: {}, [], strings, closures
Off-Heap (Node.js Buffer):
- Allocated via malloc/mmap
- No GC pressure
- Limits: available system RAM
- Objects: raw binary data only
When you process a 500MB blockchain block file with Buffer, V8's heap stays flat. The data never touches the GC. The only heap allocation is a small Buffer JavaScript object (~100 bytes) that wraps the off-heap memory pointer.
Buffer Internals: Three Allocation Methods
Buffer.alloc(size)
javascriptconst buf = Buffer.alloc(65536); // 64KB, zero-filled
Allocates size bytes of off-heap memory and zeros it. Safe for security-sensitive operations where you cannot allow previous memory contents to be readable. Slower due to the memset.
Use when: the buffer will be read before being fully written (zero prevents reading stale data), or the data is security-sensitive.
Buffer.allocUnsafe(size)
javascriptconst buf = Buffer.allocUnsafe(65536); // 64KB, NOT zero-filled
Allocates size bytes of off-heap memory without zeroing. The memory may contain previous data from other allocations. Faster — no memset.
For sizes ≤ 8KB: allocates from an 8KB pooled slab that libuv pre-allocates. Multiple small Buffer.allocUnsafe calls share the same underlying memory slab, reducing syscall overhead.
Use when: you will immediately write to the buffer before reading, and the data is not security-sensitive. Most ingestion pipelines fall into this category — you write parsed transaction data into the buffer immediately.
Buffer.allocUnsafeSlow(size)
javascriptconst buf = Buffer.allocUnsafeSlow(4 * 1024 * 1024); // 4MB, NOT zero-filled
Allocates size bytes of off-heap memory bypassing the 8KB slab pool. Each call requests memory directly from the OS (via malloc). Slower than allocUnsafe for small buffers, but avoids fragmentation for large allocations.
Use when: allocating large buffers (> 8KB) that will live for a long time. Avoids consuming the shared slab, which would be a waste for large chunks.
javascript// Decision tree: // - Needs zeroing or security-sensitive? → Buffer.alloc() // - Small (< 8KB), write-immediately? → Buffer.allocUnsafe() // - Large (> 8KB) or long-lived? → Buffer.allocUnsafeSlow()
Buffer vs TypedArray vs ArrayBuffer
javascript// ArrayBuffer: a fixed-size block of memory. No methods to read/write it directly. const ab = new ArrayBuffer(1024); // TypedArray: a typed view into an ArrayBuffer. Lives in V8 heap (the view object), // but the underlying ArrayBuffer data may be off-heap depending on size. const u8 = new Uint8Array(ab); // view into ab const f64 = new Float64Array(ab); // another view, same memory // Buffer: Node.js's subclass of Uint8Array. Always off-heap for large allocations. // Has convenience methods: readUInt32BE, writeFloatLE, copy, etc. const buf = Buffer.from(ab); // wrap existing ArrayBuffer as Buffer
For ingestion pipelines: use Buffer. It has the convenience methods (read/write integers at offsets, copy, compare) needed for binary protocol parsing. TypedArray is better for numeric computation where V8's typed array optimization applies.
Transform Streams: The Core of Every Ingestion Pipeline
A Transform stream is both a Readable and a Writable. It receives data chunks in, transforms them, and emits transformed chunks out. It is the correct abstraction for:
- Decoding raw binary transaction data into JavaScript objects
- Parsing line-delimited JSON from a log file
- Decompressing compressed data streams
- Normalizing inconsistent data formats
The _transform Method
javascriptimport { Transform } from 'node:stream'; class TransactionDecoder extends Transform { #buffer = Buffer.allocUnsafe(0); // accumulate incomplete chunks constructor() { super({ readableObjectMode: true, // emit objects downstream // writableObjectMode: false (default) — receives raw Buffers }); } _transform(chunk, encoding, callback) { // Append incoming chunk to our accumulation buffer this.#buffer = Buffer.concat([this.#buffer, chunk]); // Parse as many complete transactions as possible while (this.#buffer.length >= TRANSACTION_HEADER_SIZE) { const txLength = this.#buffer.readUInt32BE(0); // first 4 bytes = length if (this.#buffer.length < txLength + 4) break; // incomplete transaction const txBuffer = this.#buffer.subarray(4, 4 + txLength); const transaction = decodeTransaction(txBuffer); this.push(transaction); // emit decoded transaction downstream // Advance buffer past this transaction this.#buffer = this.#buffer.subarray(4 + txLength); } callback(); // signal: ready for more input } _flush(callback) { // Called when the writable side ends // Process any remaining data in the buffer if (this.#buffer.length > 0) { // Attempt to parse remaining bytes or emit as partial console.warn(`Flush: ${this.#buffer.length} bytes remaining in buffer`); } callback(); } }
The _flush Method
_flush is called when the upstream has ended and there is no more data coming. It is your chance to emit any final output from accumulated state. Always call callback() — forgetting it hangs the pipeline.
javascript// Example: batch emitter that holds items until batch is full, // then flushes remaining items at end of stream class BatchTransform extends Transform { #batch = []; #batchSize; constructor(batchSize = 100) { super({ readableObjectMode: true, writableObjectMode: true }); this.#batchSize = batchSize; } _transform(item, _, callback) { this.#batch.push(item); if (this.#batch.length >= this.#batchSize) { this.push(this.#batch.splice(0)); // emit full batch } callback(); } _flush(callback) { if (this.#batch.length > 0) { this.push(this.#batch.splice(0)); // emit partial final batch } callback(); } }
pipeline() vs .pipe(): Always Use pipeline()
.pipe() has a fatal flaw: it does not handle errors correctly. If a stream in the chain errors, other streams are not automatically destroyed. You get resource leaks.
javascript// BAD: .pipe() — stream leaks on error readStream .pipe(decompressStream) .pipe(parseStream) .pipe(writeStream); // If parseStream throws: readStream and decompressStream keep running. // Memory grows. File handles stay open.
javascript// GOOD: pipeline() — proper error handling and cleanup import { pipeline } from 'node:stream/promises'; await pipeline( readStream, decompressStream, parseStream, writeStream ); // If ANY stream in the chain errors: // - All other streams are automatically destroyed // - The Promise rejects with the error // - Resources are cleaned up
pipeline() with Async Generators
The most flexible form of pipeline() uses async generators as transform stages:
javascriptimport { pipeline } from 'node:stream/promises'; import { createReadStream } from 'node:fs'; import { createGunzip } from 'node:zlib'; // Process a 2GB compressed blockchain transaction log await pipeline( // Stage 1: Read file as chunks createReadStream('transactions-2026-05-17.log.gz'), // Stage 2: Decompress createGunzip(), // Stage 3: Split by newline using async generator async function* splitLines(source) { let partial = ''; for await (const chunk of source) { const lines = (partial + chunk.toString('utf8')).split('\n'); partial = lines.pop(); // last line may be incomplete for (const line of lines) { if (line.trim()) yield line; } } if (partial.trim()) yield partial; // emit final line }, // Stage 4: Parse each line as JSON async function* parseJSON(source) { for await (const line of source) { try { yield JSON.parse(line); } catch { console.warn(`Skipping malformed line: ${line.slice(0, 100)}`); } } }, // Stage 5: Write to database async function* writeToDb(source) { const BATCH_SIZE = 500; let batch = []; for await (const transaction of source) { batch.push(transaction); if (batch.length >= BATCH_SIZE) { await db.bulkInsert(batch); yield batch.length; // emit count for progress tracking batch = []; } } if (batch.length > 0) { await db.bulkInsert(batch); yield batch.length; } }, // Stage 6: Track progress async function* trackProgress(source) { let total = 0; for await (const count of source) { total += count; if (total % 100_000 === 0) { console.log(`Processed ${total.toLocaleString()} transactions`); } } console.log(`Complete: ${total.toLocaleString()} transactions`); } ); // Memory throughout this entire pipeline: bounded by HWM settings // regardless of the 2GB input file size
Off-Heap Buffer Pooling for Zero-Allocation Hot Paths
When your hot path processes millions of transactions per second, even the overhead of Buffer.allocUnsafe() accumulates. Pre-allocate a pool of reusable buffers:
javascript// Buffer pool for high-frequency binary processing class BufferPool { #free = []; #bufferSize; #maxPoolSize; constructor({ bufferSize = 65536, maxPoolSize = 100 } = {}) { this.#bufferSize = bufferSize; this.#maxPoolSize = maxPoolSize; // Pre-allocate for (let i = 0; i < maxPoolSize; i++) { this.#free.push(Buffer.allocUnsafeSlow(bufferSize)); } } acquire() { return this.#free.pop() ?? Buffer.allocUnsafeSlow(this.#bufferSize); } release(buf) { if (this.#free.length < this.#maxPoolSize) { this.#free.push(buf); } // If pool is full, the buffer is simply dereferenced and GC'd // (it's a small JS object; the underlying off-heap memory is freed by finalizer) } } const blockBufferPool = new BufferPool({ bufferSize: 5 * 1024 * 1024, maxPoolSize: 20 }); // Hot path: zero allocation for block processing buffers function processBlock(rawBlockStream) { const buf = blockBufferPool.acquire(); return new Promise((resolve, reject) => { let offset = 0; rawBlockStream.on('data', (chunk) => { chunk.copy(buf, offset); // copy into pooled buffer — no allocation offset += chunk.length; }); rawBlockStream.on('end', () => { const result = parseBlock(buf.subarray(0, offset)); blockBufferPool.release(buf); resolve(result); }); rawBlockStream.on('error', (err) => { blockBufferPool.release(buf); reject(err); }); }); }
Processing Multi-Gigabyte Settlement Files: A Complete Example
UPI settlement files are batch files containing all payment transactions for a settlement period. A large payment processor might generate 2–5GB settlement files every hour. Processing these requires streaming — loading the full file into memory would require 2–5GB of heap.
javascript// Process a 3GB UPI settlement file with bounded memory usage import { pipeline } from 'node:stream/promises'; import { createReadStream, createWriteStream } from 'node:fs'; import { createGunzip, createGzip } from 'node:zlib'; import { Transform } from 'node:stream'; // Custom Transform: parses the settlement file's fixed-width record format class SettlementParser extends Transform { static RECORD_SIZE = 256; // each record is exactly 256 bytes #partial = Buffer.allocUnsafe(0); constructor() { super({ readableObjectMode: true }); } _transform(chunk, _, callback) { const combined = Buffer.concat([this.#partial, chunk]); const completeRecords = Math.floor(combined.length / SettlementParser.RECORD_SIZE); for (let i = 0; i < completeRecords; i++) { const record = combined.subarray( i * SettlementParser.RECORD_SIZE, (i + 1) * SettlementParser.RECORD_SIZE ); this.push(this.#parseRecord(record)); } // Keep incomplete trailing bytes for next chunk const consumed = completeRecords * SettlementParser.RECORD_SIZE; this.#partial = combined.subarray(consumed); callback(); } _flush(callback) { if (this.#partial.length > 0 && this.#partial.length === SettlementParser.RECORD_SIZE) { this.push(this.#parseRecord(this.#partial)); } callback(); } #parseRecord(buf) { return { txId: buf.toString('ascii', 0, 32).trim(), amount: buf.readBigInt64BE(32), // 8 bytes, big-endian senderId: buf.toString('ascii', 40, 72).trim(), recipientId: buf.toString('ascii', 72, 104).trim(), timestamp: buf.readUInt32BE(104), // Unix seconds status: buf.readUInt8(108), // 1=success, 2=failed, 3=pending }; } } // Process the settlement file — memory stays bounded async function processSettlementFile(inputPath, outputPath) { let processed = 0, successful = 0, failed = 0; await pipeline( createReadStream(inputPath, { highWaterMark: 4 * 1024 * 1024 }), // 4MB read chunks createGunzip(), new SettlementParser(), async function* reconcile(source) { for await (const record of source) { processed++; const enriched = await enrichFromDatabase(record); if (enriched.status === 1) successful++; else failed++; yield enriched; } }, async function* serialize(source) { for await (const record of source) { yield Buffer.from(JSON.stringify(record) + '\n', 'utf8'); } }, createGzip(), createWriteStream(outputPath) ); return { processed, successful, failed }; }
Memory profile during this operation on a 3GB file:
- V8 heap: ~40–80MB (the pipeline stream objects, parsed records in flight)
- Off-heap: up to 4MB (the read HWM) + zlib decompression buffers
- Total process memory: < 200MB regardless of file size
zlib Streaming: Decompressing Large Blockchain Archives
javascript// Stream-decompress a compressed blockchain block archive import { createGunzip } from 'node:zlib'; import { createReadStream } from 'node:fs'; import { pipeline } from 'node:stream/promises'; // zlib.createGunzip() is already a Transform stream // It handles all the decompression internals off-heap await pipeline( createReadStream('blocks-archive.gz', { highWaterMark: 2 * 1024 * 1024 // 2MB read chunks }), createGunzip(), // Transform: compressed → decompressed bytes new BlockParser(), // Transform: bytes → block objects new BlockWriter() // Writable: block objects → database );
zlib uses off-heap buffers internally. The decompression window (32KB by default) lives outside V8's heap. You can tune it:
javascriptimport zlib from 'node:zlib'; const gunzip = zlib.createGunzip({ chunkSize: 64 * 1024, // 64KB output chunks (default 16KB) level: zlib.constants.Z_DEFAULT_COMPRESSION, });
Summary
| Concept | Key Takeaway |
|---|---|
| Off-heap memory | Outside V8 heap. No GC pressure. Bounded by system RAM, not --max-old-space-size. |
Buffer.alloc() | Zero-filled. Slow. Use when data is security-sensitive or read before full write. |
Buffer.allocUnsafe() | No zeroing. Fast. Uses 8KB slab for small allocations. Use for write-first buffers. |
Buffer.allocUnsafeSlow() | No zeroing. No slab pooling. Use for large or long-lived allocations. |
Transform stream | Readable + Writable. Process chunks as they arrive. _transform for chunks, _flush for end. |
_flush | Always call callback(). Handles remaining buffered state at stream end. |
pipeline() | Correct backpressure + error propagation + cleanup. Always use over .pipe(). |
| Async generator stages | Composable pipeline stages with natural async/await semantics. No custom stream class needed. |
| Buffer pooling | Pre-allocate reusable off-heap buffers. Zero allocation in the hot path. |
HWM on createReadStream | highWaterMark controls chunk size. 4MB is appropriate for block archive ingestion. |
zlib off-heap | Decompression buffers are off-heap. Tune chunkSize for throughput vs memory trade-off. |
Off-heap Buffers and streams handle data throughput. Module 6 handles CPU throughput — how to saturate all cores on a large server using cluster and worker_threads, and how to minimize the IPC overhead that typically defeats the purpose of multi-process scaling.
Next: Module 6 — Core Scaling: Multi-Process Clustering & IPC Latency →