Module A-11·27 min read

Protocol Buffers vs JSON-over-HTTP, Kafka consumer group mechanics for 500K msg/sec, and event sourcing for UPI transaction ledgers.

Module 10 — High-Performance IPC: gRPC, Kafka & Event Streams

What this module covers: When your blockchain indexer needs to stream 500K events/second to a downstream analytics service, JSON-over-HTTP REST adds measurable overhead at every layer: HTTP/1.1 per-request connection overhead, JSON serialization CPU cost, and protocol parsing latency. gRPC over HTTP/2 with Protocol Buffers eliminates most of this. Kafka adds persistence, replay, and fault tolerance. This module covers the exact performance differences, the implementation of streaming gRPC endpoints in Node.js, Kafka consumer group mechanics for high-throughput consumption, and event sourcing for payment ledgers that need full state replay.


The Baseline: JSON-over-HTTP REST Overhead

Before measuring alternatives, establish what REST actually costs at high throughput:

Per-request overhead:

  1. TCP connection (or TLS handshake if not keep-alive) — 1–3ms
  2. HTTP/1.1 header parsing — variable, grows with header count
  3. JSON.parse() on request body — ~1ms per 100KB
  4. Business logic
  5. JSON.stringify() on response — ~0.5ms per 100KB
  6. HTTP response headers serialization

For a payment gateway doing 10,000 inter-service calls/second:

  • JSON serialization at 5KB average payload: ~50ms total per second
  • HTTP header parsing: ~30ms total per second
  • TCP connection overhead (without keep-alive): 1,000–3,000ms per second

With HTTP keep-alive: TCP overhead drops. JSON overhead remains. At 500K calls/second, JSON becomes the dominant cost.


Protocol Buffers: Binary Serialization

Protocol Buffers (protobuf) is a binary serialization format. You define a schema in a .proto file, and a code generator produces type-safe encoder/decoder functions.

protobuf
// transaction.proto syntax = "proto3"; package blockchain; message Transaction { bytes hash = 1; // 32 bytes fixed int64 block_height = 2; string sender = 3; string recipient = 4; string amount = 5; // string to avoid int64 precision issues in JS int32 status = 6; // enum: 0=pending, 1=confirmed, 2=failed int64 timestamp = 7; } message TransactionBatch { repeated Transaction transactions = 1; int32 total = 2; }

Generating Node.js code:

bash
npm install -g @bufbuild/protoc-gen-es @bufbuild/buf buf generate # generates TypeScript types + encode/decode functions

Size comparison for a typical transaction:

FormatEncoded sizeParse time (1K messages)
JSON (human-readable)340 bytes18ms
JSON (minified)220 bytes12ms
Protobuf binary82 bytes2ms

Protobuf is 4–5x smaller and 6–9x faster to parse. At 500K messages/second, this difference is:

  • JSON: 110MB/sec serialization + 6,000ms CPU/sec for parsing
  • Protobuf: 41MB/sec serialization + 1,000ms CPU/sec for parsing

The bandwidth reduction matters for inter-datacenter links. The CPU reduction matters for everything.


gRPC: HTTP/2 + Protocol Buffers

gRPC is an RPC framework that uses HTTP/2 for transport and Protocol Buffers for serialization. It provides:

  • HTTP/2 multiplexing: multiple streams over a single TCP connection
  • Bidirectional streaming: client and server can both stream data simultaneously
  • Type safety: generated stub code handles serialization
  • Load balancing: built-in support for multiple backends

The Four gRPC Call Types

protobuf
// blockchain_indexer.proto service BlockchainIndexer { // 1. Unary: single request, single response rpc GetTransaction (GetTransactionRequest) returns (Transaction); // 2. Server streaming: single request, multiple responses rpc SubscribeToBlocks (SubscribeRequest) returns (stream Block); // 3. Client streaming: multiple requests, single response rpc IngestTransactions (stream Transaction) returns (IngestResult); // 4. Bidirectional streaming: both sides stream rpc SyncBlocks (stream SyncRequest) returns (stream SyncResponse); }

Implementing a Streaming gRPC Server in Node.js

javascript
import * as grpc from '@grpc/grpc-js'; import * as protoLoader from '@grpc/proto-loader'; const packageDef = protoLoader.loadSync('blockchain_indexer.proto', { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true, }); const proto = grpc.loadPackageDefinition(packageDef).blockchain; // Server-streaming: push new blocks to all connected subscribers function subscribeToBlocks(call) { const { startHeight } = call.request; // Register this subscriber const unsubscribe = blockEventBus.subscribe((block) => { if (block.height >= startHeight) { call.write({ block }); // push to this client } }); call.on('cancelled', () => { unsubscribe(); // clean up when client disconnects }); // Note: for server streaming, don't call call.end() until done // The stream stays open until the client cancels or the server calls end() } // Client-streaming: receive batches of transactions for indexing async function ingestTransactions(call, callback) { const results = { ingested: 0, failed: 0 }; call.on('data', async (transaction) => { try { await writeTransaction(transaction); results.ingested++; } catch (err) { results.failed++; } }); call.on('end', () => { callback(null, results); // send single response when client finishes streaming }); } const server = new grpc.Server(); server.addService(proto.BlockchainIndexer.service, { getTransaction: getTransactionHandler, subscribeToBlocks, ingestTransactions, }); server.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => { server.start(); console.log('gRPC server running on :50051'); });

gRPC Client with Retry and Deadline

javascript
const client = new proto.BlockchainIndexer( 'indexer-service:50051', grpc.credentials.createInsecure(), { 'grpc.keepalive_time_ms': 30_000, 'grpc.keepalive_timeout_ms': 10_000, 'grpc.keepalive_permit_without_calls': 1, } ); // Unary call with deadline function getTransaction(hash) { return new Promise((resolve, reject) => { const deadline = new Date(Date.now() + 5000); // 5 second deadline client.getTransaction({ hash }, { deadline }, (err, response) => { if (err) reject(err); else resolve(response); }); }); } // Client-side streaming: send transactions in batches function ingestBatch(transactions) { return new Promise((resolve, reject) => { const stream = client.ingestTransactions((err, result) => { if (err) reject(err); else resolve(result); }); for (const tx of transactions) { stream.write(tx); } stream.end(); }); }

gRPC vs REST: Throughput Comparison

MetricREST + JSONgRPC + Protobuf
Payload size (5KB object)5,000 bytes~850 bytes
Serialization (100K msg/sec)~500ms CPU~85ms CPU
Connections (1K clients)1K TCP sockets1 TCP socket (mux)
Latency (p99, same network)8ms2ms
StreamingWorkarounds (SSE, WebSocket)Native (all 4 types)

gRPC is the correct choice for inter-service communication at > 10K calls/second or when bidirectional streaming is required.


Kafka: Distributed Event Ledger

Kafka is a distributed log. Unlike a message queue (which deletes messages after consumption), Kafka retains messages for a configurable retention period. Every consumer reads from its own offset in the log — Kafka does not push messages to consumers; consumers pull.

Core Concepts

Topic: a named log. transactions, blocks, payment-events.

Partition: a topic is split into partitions for parallelism. Each partition is an ordered, immutable log. Within a partition, messages are ordered by offset. Across partitions, there is no ordering guarantee.

Consumer group: a set of consumers that collectively read a topic. Kafka assigns partitions to consumers — each partition is consumed by exactly one consumer in the group. If you have 8 partitions and 8 consumers: each consumer reads one partition. If you have 8 partitions and 4 consumers: each consumer reads two partitions.

javascript
import { Kafka } from 'kafkajs'; const kafka = new Kafka({ clientId: 'blockchain-indexer', brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'], ssl: true, sasl: { mechanism: 'scram-sha-256', username: 'indexer', password: process.env.KAFKA_PASS }, });

Producer: Batching and Compression

javascript
const producer = kafka.producer({ // Batch settings for throughput maxInFlightRequests: 5, // 5 simultaneous requests to broker idempotent: true, // exactly-once delivery (enables transactions) transactionTimeout: 60_000, }); await producer.connect(); // High-throughput batch send async function publishTransactionBatch(transactions) { await producer.sendBatch({ topicMessages: [{ topic: 'transactions', messages: transactions.map(tx => ({ key: tx.hash, // partition key — same sender always routes to same partition value: Buffer.from(serializeProtobuf(tx)), // protobuf encoding timestamp: tx.timestamp.toString(), })), }], compression: CompressionTypes.LZ4, // fast compression, good ratio acks: -1, // wait for all replicas (durability) }); }

Compression at 500K messages/second:

  • LZ4: 2–3x compression, minimal CPU cost. Best for throughput.
  • Snappy: 1.5–2x compression, very fast. Good default.
  • GZIP: 3–5x compression, high CPU. Use for storage efficiency, not throughput.

Consumer Group: Partition Assignment and Rebalancing

javascript
const consumer = kafka.consumer({ groupId: 'transaction-processor', sessionTimeout: 30_000, // time before broker considers consumer dead heartbeatInterval: 3_000, // send heartbeat every 3s maxBytesPerPartition: 1048576, // 1MB max per fetch per partition }); await consumer.connect(); await consumer.subscribe({ topics: ['transactions'], fromBeginning: false }); // Rebalance listener — called when partition assignment changes consumer.on(consumer.events.GROUP_JOIN, ({ payload }) => { console.log(`Joined group, assigned partitions: ${payload.memberAssignment}`); }); await consumer.run({ partitionsConsumedConcurrently: 4, // process 4 partitions concurrently per consumer eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning }) => { for (const message of batch.messages) { if (!isRunning()) break; // check for shutdown signal const tx = deserializeProtobuf(message.value); await processTransaction(tx); // Commit offset for this message resolveOffset(message.offset); // Heartbeat to prevent session timeout during long batches await heartbeat(); } } });

Consumer Lag: The Critical Operational Metric

Consumer lag = (latest offset in topic) - (consumer's current offset).

Lag = 0: consumer is caught up. Lag > 0: consumer is falling behind. Lag growing: consumer cannot keep up with producer rate.

javascript
// Monitor consumer lag from within Node.js const admin = kafka.admin(); await admin.connect(); async function getConsumerLag() { const offsets = await admin.fetchOffsets({ groupId: 'transaction-processor', topics: ['transactions'], }); const topicOffsets = await admin.fetchTopicOffsets('transactions'); let totalLag = 0; for (const partition of offsets[0].partitions) { const endOffset = topicOffsets.find(t => t.partition === partition.partition); const lag = parseInt(endOffset.offset) - parseInt(partition.offset); totalLag += lag; consumerLagGauge.labels(partition.partition.toString()).set(lag); } return totalLag; } setInterval(async () => { const lag = await getConsumerLag(); if (lag > 500_000) { logger.error(`CRITICAL: consumer lag ${lag} — add more consumer instances`); } }, 10_000);

Event Sourcing for Payment Ledgers

Event sourcing stores every state change as an immutable event, instead of storing current state. The current state is derived by replaying all events.

Traditional approach:
  Table: accounts { id, balance, updated_at }
  UPDATE accounts SET balance = balance - 100 WHERE id = 'acc-1'
  → Current state only. History is gone.

Event sourcing:
  Table: account_events { id, account_id, event_type, amount, timestamp }
  INSERT INTO account_events VALUES ('evt-1', 'acc-1', 'DEBIT', 100, NOW())
  → Full history. Balance = SUM(credits) - SUM(debits).

Why event sourcing for payment ledgers:

  1. Audit trail: every transaction is immutable. You can prove the exact sequence of operations that led to any balance.

  2. Replay: if you discover a bug in your balance calculation logic, replay all events with the fixed logic to get correct current balances.

  3. Temporal queries: what was account X's balance on March 15th? Replay events up to March 15th.

  4. Kafka as the event store: Kafka's log retention makes it a natural event store.

javascript
// Payment ledger event sourcing with Kafka // Write side: append events async function debitAccount(accountId, amount, reference) { await producer.send({ topic: 'payment-ledger', messages: [{ key: accountId, // same account always goes to same partition (ordering) value: JSON.stringify({ eventType: 'ACCOUNT_DEBITED', eventId: crypto.randomUUID(), accountId, amount: amount.toString(), reference, timestamp: Date.now(), }), }], acks: -1, // wait for all replicas before confirming }); } // Read side: rebuild account state from events async function getAccountBalance(accountId) { // Option 1: query the projection (maintained by consumer) const { rows } = await pool.query( 'SELECT balance FROM account_projections WHERE account_id = $1', [accountId] ); return rows[0]?.balance ?? 0n; } // Consumer: builds and maintains projection consumer.run({ eachMessage: async ({ message }) => { const event = JSON.parse(message.value.toString()); if (event.eventType === 'ACCOUNT_DEBITED') { await pool.query( `INSERT INTO account_projections (account_id, balance) VALUES ($1, 0 - $2) ON CONFLICT (account_id) DO UPDATE SET balance = account_projections.balance - $2`, [event.accountId, BigInt(event.amount)] ); } else if (event.eventType === 'ACCOUNT_CREDITED') { await pool.query( `INSERT INTO account_projections (account_id, balance) VALUES ($1, $2) ON CONFLICT (account_id) DO UPDATE SET balance = account_projections.balance + $2`, [event.accountId, BigInt(event.amount)] ); } } });

Snapshots: Avoiding Full Replay

For accounts with millions of historical events, replaying from the beginning is impractical. Snapshots cache the balance at a point in time:

javascript
// Snapshot the account balance every 1,000 events async function snapshotAccount(accountId, balance, lastEventOffset) { await pool.query( `INSERT INTO account_snapshots (account_id, balance, kafka_offset, created_at) VALUES ($1, $2, $3, NOW()) ON CONFLICT (account_id) DO UPDATE SET balance = $2, kafka_offset = $3, created_at = NOW()`, [accountId, balance.toString(), lastEventOffset] ); } // Replay from snapshot + subsequent events only async function rebuildFromSnapshot(accountId) { const snapshot = await pool.query( 'SELECT balance, kafka_offset FROM account_snapshots WHERE account_id = $1', [accountId] ); if (!snapshot.rows[0]) return rebuildFromBeginning(accountId); const { balance, kafka_offset } = snapshot.rows[0]; // Replay only events after the snapshot offset return replayFromOffset(accountId, kafka_offset, BigInt(balance)); }

RabbitMQ vs Kafka: When Each Is Correct

KafkaRabbitMQ
ModelPull-based logPush-based queue
Message retentionConfigurable (days/weeks)Deleted after consumption
ReplayYes — replay from any offsetNo — consumed messages are gone
Throughput1M+ messages/sec~50K messages/sec
OrderingWithin partitionPer-queue (with single consumer)
Consumer groupsBuilt-inVia competing consumers
Use caseEvent sourcing, audit logs, stream processingTask queues, RPC, work distribution

For a blockchain indexer: Kafka. You need replay capability (debug consumer bugs), high throughput (500K tx/sec), and consumer groups for parallel processing.

For payment notification emails: RabbitMQ. You need exactly-once delivery, message acknowledgement, dead letter queues, and retry logic. Kafka can do this but the configuration is more complex.


Production Incident: Kafka Partition Imbalance Causing Consumer Hotspot

Context: A payment event stream with 12 partitions, 12 consumer instances. Partition key was senderId. One major payment aggregator (a large e-commerce platform) sent 40% of all payment events. All events from this sender went to one partition.

What happened:

One consumer instance was processing 40% of total volume. Its consumer lag grew continuously. The other 11 instances were at 5% each. Rebalancing couldn't help — the problem was the partition key, not the number of consumers.

The fix — composite partition key:

javascript
// Before: senderId as partition key → all traffic from one sender → one partition key: tx.senderId // After: composite key hashes to distribute load key: crypto.createHash('sha256') .update(`${tx.senderId}-${tx.timestamp % 1000}`) // bucket by sender + time bucket .digest('hex') .slice(0, 16) // Same sender's events are spread across ~1000 time buckets // Still deterministic routing (same bucket always → same partition range) // But no single partition receives 40% of traffic

Also added consumer lag alerting per partition (not just total lag) to detect hotspots early:

javascript
for (const { partition, lag } of partitionLags) { if (lag > avgLag * 3) { logger.warn(`Partition ${partition} hotspot: lag ${lag} vs avg ${avgLag}`); } }

Summary

ConceptKey Takeaway
Protobuf vs JSON4–5x smaller, 6–9x faster parse. Critical at > 100K msg/sec.
gRPC over HTTP/2Multiplexed streams, 4 call types. Native streaming support. 4x lower latency than REST.
gRPC server streamingcall.write() pushes to client. call.on('cancelled') for cleanup.
Kafka retentionMessages persist after consumption. Replay from any offset.
Consumer groupsPartitions assigned to consumers. One partition → one consumer per group.
Consumer lagGrowing lag = consumer can't keep up. Scale consumers or optimize processing.
Partition keyHigh-cardinality composite keys prevent hotspots. Never use a skewed key.
acks: -1Wait for all replicas before confirming publish. Required for durability.
LZ4 compressionBest throughput/CPU ratio. Use for high-volume streams.
Event sourcingImmutable event log. Full history. Replay to any point in time.
SnapshotsCache balance at N events to avoid full replay on every query.
Kafka vs RabbitMQKafka for event streams/replay. RabbitMQ for task queues with ACK semantics.

You now have the communication backbone. Module 11 covers how to keep the code clean as these systems grow — DDD applied to banking ledgers, dependency injection that lets you swap transports, and CQRS that separates the write model from the read model.

Next: Module 11 — Enterprise Architecture for State-Heavy Systems: DDD & Clean Architecture →

© 2026 Jatin Jain Saraf (JJS). All rights reserved.