Protocol Buffers vs JSON-over-HTTP, Kafka consumer group mechanics for 500K msg/sec, and event sourcing for UPI transaction ledgers.
Module 10 — High-Performance IPC: gRPC, Kafka & Event Streams
What this module covers: When your blockchain indexer needs to stream 500K events/second to a downstream analytics service, JSON-over-HTTP REST adds measurable overhead at every layer: HTTP/1.1 per-request connection overhead, JSON serialization CPU cost, and protocol parsing latency. gRPC over HTTP/2 with Protocol Buffers eliminates most of this. Kafka adds persistence, replay, and fault tolerance. This module covers the exact performance differences, the implementation of streaming gRPC endpoints in Node.js, Kafka consumer group mechanics for high-throughput consumption, and event sourcing for payment ledgers that need full state replay.
The Baseline: JSON-over-HTTP REST Overhead
Before measuring alternatives, establish what REST actually costs at high throughput:
Per-request overhead:
- TCP connection (or TLS handshake if not keep-alive) — 1–3ms
- HTTP/1.1 header parsing — variable, grows with header count
JSON.parse()on request body — ~1ms per 100KB- Business logic
JSON.stringify()on response — ~0.5ms per 100KB- HTTP response headers serialization
For a payment gateway doing 10,000 inter-service calls/second:
- JSON serialization at 5KB average payload: ~50ms total per second
- HTTP header parsing: ~30ms total per second
- TCP connection overhead (without keep-alive): 1,000–3,000ms per second
With HTTP keep-alive: TCP overhead drops. JSON overhead remains. At 500K calls/second, JSON becomes the dominant cost.
Protocol Buffers: Binary Serialization
Protocol Buffers (protobuf) is a binary serialization format. You define a schema in a .proto file, and a code generator produces type-safe encoder/decoder functions.
protobuf// transaction.proto syntax = "proto3"; package blockchain; message Transaction { bytes hash = 1; // 32 bytes fixed int64 block_height = 2; string sender = 3; string recipient = 4; string amount = 5; // string to avoid int64 precision issues in JS int32 status = 6; // enum: 0=pending, 1=confirmed, 2=failed int64 timestamp = 7; } message TransactionBatch { repeated Transaction transactions = 1; int32 total = 2; }
Generating Node.js code:
bashnpm install -g @bufbuild/protoc-gen-es @bufbuild/buf buf generate # generates TypeScript types + encode/decode functions
Size comparison for a typical transaction:
| Format | Encoded size | Parse time (1K messages) |
|---|---|---|
| JSON (human-readable) | 340 bytes | 18ms |
| JSON (minified) | 220 bytes | 12ms |
| Protobuf binary | 82 bytes | 2ms |
Protobuf is 4–5x smaller and 6–9x faster to parse. At 500K messages/second, this difference is:
- JSON: 110MB/sec serialization + 6,000ms CPU/sec for parsing
- Protobuf: 41MB/sec serialization + 1,000ms CPU/sec for parsing
The bandwidth reduction matters for inter-datacenter links. The CPU reduction matters for everything.
gRPC: HTTP/2 + Protocol Buffers
gRPC is an RPC framework that uses HTTP/2 for transport and Protocol Buffers for serialization. It provides:
- HTTP/2 multiplexing: multiple streams over a single TCP connection
- Bidirectional streaming: client and server can both stream data simultaneously
- Type safety: generated stub code handles serialization
- Load balancing: built-in support for multiple backends
The Four gRPC Call Types
protobuf// blockchain_indexer.proto service BlockchainIndexer { // 1. Unary: single request, single response rpc GetTransaction (GetTransactionRequest) returns (Transaction); // 2. Server streaming: single request, multiple responses rpc SubscribeToBlocks (SubscribeRequest) returns (stream Block); // 3. Client streaming: multiple requests, single response rpc IngestTransactions (stream Transaction) returns (IngestResult); // 4. Bidirectional streaming: both sides stream rpc SyncBlocks (stream SyncRequest) returns (stream SyncResponse); }
Implementing a Streaming gRPC Server in Node.js
javascriptimport * as grpc from '@grpc/grpc-js'; import * as protoLoader from '@grpc/proto-loader'; const packageDef = protoLoader.loadSync('blockchain_indexer.proto', { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true, }); const proto = grpc.loadPackageDefinition(packageDef).blockchain; // Server-streaming: push new blocks to all connected subscribers function subscribeToBlocks(call) { const { startHeight } = call.request; // Register this subscriber const unsubscribe = blockEventBus.subscribe((block) => { if (block.height >= startHeight) { call.write({ block }); // push to this client } }); call.on('cancelled', () => { unsubscribe(); // clean up when client disconnects }); // Note: for server streaming, don't call call.end() until done // The stream stays open until the client cancels or the server calls end() } // Client-streaming: receive batches of transactions for indexing async function ingestTransactions(call, callback) { const results = { ingested: 0, failed: 0 }; call.on('data', async (transaction) => { try { await writeTransaction(transaction); results.ingested++; } catch (err) { results.failed++; } }); call.on('end', () => { callback(null, results); // send single response when client finishes streaming }); } const server = new grpc.Server(); server.addService(proto.BlockchainIndexer.service, { getTransaction: getTransactionHandler, subscribeToBlocks, ingestTransactions, }); server.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => { server.start(); console.log('gRPC server running on :50051'); });
gRPC Client with Retry and Deadline
javascriptconst client = new proto.BlockchainIndexer( 'indexer-service:50051', grpc.credentials.createInsecure(), { 'grpc.keepalive_time_ms': 30_000, 'grpc.keepalive_timeout_ms': 10_000, 'grpc.keepalive_permit_without_calls': 1, } ); // Unary call with deadline function getTransaction(hash) { return new Promise((resolve, reject) => { const deadline = new Date(Date.now() + 5000); // 5 second deadline client.getTransaction({ hash }, { deadline }, (err, response) => { if (err) reject(err); else resolve(response); }); }); } // Client-side streaming: send transactions in batches function ingestBatch(transactions) { return new Promise((resolve, reject) => { const stream = client.ingestTransactions((err, result) => { if (err) reject(err); else resolve(result); }); for (const tx of transactions) { stream.write(tx); } stream.end(); }); }
gRPC vs REST: Throughput Comparison
| Metric | REST + JSON | gRPC + Protobuf |
|---|---|---|
| Payload size (5KB object) | 5,000 bytes | ~850 bytes |
| Serialization (100K msg/sec) | ~500ms CPU | ~85ms CPU |
| Connections (1K clients) | 1K TCP sockets | 1 TCP socket (mux) |
| Latency (p99, same network) | 8ms | 2ms |
| Streaming | Workarounds (SSE, WebSocket) | Native (all 4 types) |
gRPC is the correct choice for inter-service communication at > 10K calls/second or when bidirectional streaming is required.
Kafka: Distributed Event Ledger
Kafka is a distributed log. Unlike a message queue (which deletes messages after consumption), Kafka retains messages for a configurable retention period. Every consumer reads from its own offset in the log — Kafka does not push messages to consumers; consumers pull.
Core Concepts
Topic: a named log. transactions, blocks, payment-events.
Partition: a topic is split into partitions for parallelism. Each partition is an ordered, immutable log. Within a partition, messages are ordered by offset. Across partitions, there is no ordering guarantee.
Consumer group: a set of consumers that collectively read a topic. Kafka assigns partitions to consumers — each partition is consumed by exactly one consumer in the group. If you have 8 partitions and 8 consumers: each consumer reads one partition. If you have 8 partitions and 4 consumers: each consumer reads two partitions.
javascriptimport { Kafka } from 'kafkajs'; const kafka = new Kafka({ clientId: 'blockchain-indexer', brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'], ssl: true, sasl: { mechanism: 'scram-sha-256', username: 'indexer', password: process.env.KAFKA_PASS }, });
Producer: Batching and Compression
javascriptconst producer = kafka.producer({ // Batch settings for throughput maxInFlightRequests: 5, // 5 simultaneous requests to broker idempotent: true, // exactly-once delivery (enables transactions) transactionTimeout: 60_000, }); await producer.connect(); // High-throughput batch send async function publishTransactionBatch(transactions) { await producer.sendBatch({ topicMessages: [{ topic: 'transactions', messages: transactions.map(tx => ({ key: tx.hash, // partition key — same sender always routes to same partition value: Buffer.from(serializeProtobuf(tx)), // protobuf encoding timestamp: tx.timestamp.toString(), })), }], compression: CompressionTypes.LZ4, // fast compression, good ratio acks: -1, // wait for all replicas (durability) }); }
Compression at 500K messages/second:
- LZ4: 2–3x compression, minimal CPU cost. Best for throughput.
- Snappy: 1.5–2x compression, very fast. Good default.
- GZIP: 3–5x compression, high CPU. Use for storage efficiency, not throughput.
Consumer Group: Partition Assignment and Rebalancing
javascriptconst consumer = kafka.consumer({ groupId: 'transaction-processor', sessionTimeout: 30_000, // time before broker considers consumer dead heartbeatInterval: 3_000, // send heartbeat every 3s maxBytesPerPartition: 1048576, // 1MB max per fetch per partition }); await consumer.connect(); await consumer.subscribe({ topics: ['transactions'], fromBeginning: false }); // Rebalance listener — called when partition assignment changes consumer.on(consumer.events.GROUP_JOIN, ({ payload }) => { console.log(`Joined group, assigned partitions: ${payload.memberAssignment}`); }); await consumer.run({ partitionsConsumedConcurrently: 4, // process 4 partitions concurrently per consumer eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning }) => { for (const message of batch.messages) { if (!isRunning()) break; // check for shutdown signal const tx = deserializeProtobuf(message.value); await processTransaction(tx); // Commit offset for this message resolveOffset(message.offset); // Heartbeat to prevent session timeout during long batches await heartbeat(); } } });
Consumer Lag: The Critical Operational Metric
Consumer lag = (latest offset in topic) - (consumer's current offset).
Lag = 0: consumer is caught up. Lag > 0: consumer is falling behind. Lag growing: consumer cannot keep up with producer rate.
javascript// Monitor consumer lag from within Node.js const admin = kafka.admin(); await admin.connect(); async function getConsumerLag() { const offsets = await admin.fetchOffsets({ groupId: 'transaction-processor', topics: ['transactions'], }); const topicOffsets = await admin.fetchTopicOffsets('transactions'); let totalLag = 0; for (const partition of offsets[0].partitions) { const endOffset = topicOffsets.find(t => t.partition === partition.partition); const lag = parseInt(endOffset.offset) - parseInt(partition.offset); totalLag += lag; consumerLagGauge.labels(partition.partition.toString()).set(lag); } return totalLag; } setInterval(async () => { const lag = await getConsumerLag(); if (lag > 500_000) { logger.error(`CRITICAL: consumer lag ${lag} — add more consumer instances`); } }, 10_000);
Event Sourcing for Payment Ledgers
Event sourcing stores every state change as an immutable event, instead of storing current state. The current state is derived by replaying all events.
Traditional approach:
Table: accounts { id, balance, updated_at }
UPDATE accounts SET balance = balance - 100 WHERE id = 'acc-1'
→ Current state only. History is gone.
Event sourcing:
Table: account_events { id, account_id, event_type, amount, timestamp }
INSERT INTO account_events VALUES ('evt-1', 'acc-1', 'DEBIT', 100, NOW())
→ Full history. Balance = SUM(credits) - SUM(debits).
Why event sourcing for payment ledgers:
-
Audit trail: every transaction is immutable. You can prove the exact sequence of operations that led to any balance.
-
Replay: if you discover a bug in your balance calculation logic, replay all events with the fixed logic to get correct current balances.
-
Temporal queries: what was account X's balance on March 15th? Replay events up to March 15th.
-
Kafka as the event store: Kafka's log retention makes it a natural event store.
javascript// Payment ledger event sourcing with Kafka // Write side: append events async function debitAccount(accountId, amount, reference) { await producer.send({ topic: 'payment-ledger', messages: [{ key: accountId, // same account always goes to same partition (ordering) value: JSON.stringify({ eventType: 'ACCOUNT_DEBITED', eventId: crypto.randomUUID(), accountId, amount: amount.toString(), reference, timestamp: Date.now(), }), }], acks: -1, // wait for all replicas before confirming }); } // Read side: rebuild account state from events async function getAccountBalance(accountId) { // Option 1: query the projection (maintained by consumer) const { rows } = await pool.query( 'SELECT balance FROM account_projections WHERE account_id = $1', [accountId] ); return rows[0]?.balance ?? 0n; } // Consumer: builds and maintains projection consumer.run({ eachMessage: async ({ message }) => { const event = JSON.parse(message.value.toString()); if (event.eventType === 'ACCOUNT_DEBITED') { await pool.query( `INSERT INTO account_projections (account_id, balance) VALUES ($1, 0 - $2) ON CONFLICT (account_id) DO UPDATE SET balance = account_projections.balance - $2`, [event.accountId, BigInt(event.amount)] ); } else if (event.eventType === 'ACCOUNT_CREDITED') { await pool.query( `INSERT INTO account_projections (account_id, balance) VALUES ($1, $2) ON CONFLICT (account_id) DO UPDATE SET balance = account_projections.balance + $2`, [event.accountId, BigInt(event.amount)] ); } } });
Snapshots: Avoiding Full Replay
For accounts with millions of historical events, replaying from the beginning is impractical. Snapshots cache the balance at a point in time:
javascript// Snapshot the account balance every 1,000 events async function snapshotAccount(accountId, balance, lastEventOffset) { await pool.query( `INSERT INTO account_snapshots (account_id, balance, kafka_offset, created_at) VALUES ($1, $2, $3, NOW()) ON CONFLICT (account_id) DO UPDATE SET balance = $2, kafka_offset = $3, created_at = NOW()`, [accountId, balance.toString(), lastEventOffset] ); } // Replay from snapshot + subsequent events only async function rebuildFromSnapshot(accountId) { const snapshot = await pool.query( 'SELECT balance, kafka_offset FROM account_snapshots WHERE account_id = $1', [accountId] ); if (!snapshot.rows[0]) return rebuildFromBeginning(accountId); const { balance, kafka_offset } = snapshot.rows[0]; // Replay only events after the snapshot offset return replayFromOffset(accountId, kafka_offset, BigInt(balance)); }
RabbitMQ vs Kafka: When Each Is Correct
| Kafka | RabbitMQ | |
|---|---|---|
| Model | Pull-based log | Push-based queue |
| Message retention | Configurable (days/weeks) | Deleted after consumption |
| Replay | Yes — replay from any offset | No — consumed messages are gone |
| Throughput | 1M+ messages/sec | ~50K messages/sec |
| Ordering | Within partition | Per-queue (with single consumer) |
| Consumer groups | Built-in | Via competing consumers |
| Use case | Event sourcing, audit logs, stream processing | Task queues, RPC, work distribution |
For a blockchain indexer: Kafka. You need replay capability (debug consumer bugs), high throughput (500K tx/sec), and consumer groups for parallel processing.
For payment notification emails: RabbitMQ. You need exactly-once delivery, message acknowledgement, dead letter queues, and retry logic. Kafka can do this but the configuration is more complex.
Production Incident: Kafka Partition Imbalance Causing Consumer Hotspot
Context: A payment event stream with 12 partitions, 12 consumer instances. Partition key was senderId. One major payment aggregator (a large e-commerce platform) sent 40% of all payment events. All events from this sender went to one partition.
What happened:
One consumer instance was processing 40% of total volume. Its consumer lag grew continuously. The other 11 instances were at 5% each. Rebalancing couldn't help — the problem was the partition key, not the number of consumers.
The fix — composite partition key:
javascript// Before: senderId as partition key → all traffic from one sender → one partition key: tx.senderId // After: composite key hashes to distribute load key: crypto.createHash('sha256') .update(`${tx.senderId}-${tx.timestamp % 1000}`) // bucket by sender + time bucket .digest('hex') .slice(0, 16) // Same sender's events are spread across ~1000 time buckets // Still deterministic routing (same bucket always → same partition range) // But no single partition receives 40% of traffic
Also added consumer lag alerting per partition (not just total lag) to detect hotspots early:
javascriptfor (const { partition, lag } of partitionLags) { if (lag > avgLag * 3) { logger.warn(`Partition ${partition} hotspot: lag ${lag} vs avg ${avgLag}`); } }
Summary
| Concept | Key Takeaway |
|---|---|
| Protobuf vs JSON | 4–5x smaller, 6–9x faster parse. Critical at > 100K msg/sec. |
| gRPC over HTTP/2 | Multiplexed streams, 4 call types. Native streaming support. 4x lower latency than REST. |
| gRPC server streaming | call.write() pushes to client. call.on('cancelled') for cleanup. |
| Kafka retention | Messages persist after consumption. Replay from any offset. |
| Consumer groups | Partitions assigned to consumers. One partition → one consumer per group. |
| Consumer lag | Growing lag = consumer can't keep up. Scale consumers or optimize processing. |
| Partition key | High-cardinality composite keys prevent hotspots. Never use a skewed key. |
acks: -1 | Wait for all replicas before confirming publish. Required for durability. |
| LZ4 compression | Best throughput/CPU ratio. Use for high-volume streams. |
| Event sourcing | Immutable event log. Full history. Replay to any point in time. |
| Snapshots | Cache balance at N events to avoid full replay on every query. |
| Kafka vs RabbitMQ | Kafka for event streams/replay. RabbitMQ for task queues with ACK semantics. |
You now have the communication backbone. Module 11 covers how to keep the code clean as these systems grow — DDD applied to banking ledgers, dependency injection that lets you swap transports, and CQRS that separates the write model from the read model.
Next: Module 11 — Enterprise Architecture for State-Heavy Systems: DDD & Clean Architecture →