Aggregates, invariants, Ports & Adapters, and CQRS applied to banking ledgers — keeping domain logic clean under extreme architectural complexity.
Module 11 — Enterprise Architecture for State-Heavy Systems: DDD & Clean Architecture
What this module covers: A blockchain indexer processing 50,000 events/second will accumulate complex business rules over time: transaction validity invariants, account balance constraints, settlement reconciliation logic, fraud detection rules. Without architectural discipline, these rules scatter across route handlers, database queries, and event listeners — untestable, fragile, and impossible to reason about under load. This module covers Domain-Driven Design applied to state-heavy systems, Clean Architecture for transport-agnostic domain logic, and CQRS for systems where the read model and write model need to evolve independently.
Why Architecture Matters More at Scale
At low throughput (< 1K events/sec), architectural shortcuts are invisible. Validation in route handlers, business logic in SQL queries, domain concepts scattered across layers — these are maintainability problems, not performance problems.
At high throughput, architectural shortcuts become performance problems:
javascript// BAD: business logic in the route handler app.post('/api/v2/payments', async (req, res) => { const { senderId, recipientId, amount } = req.body; // Business logic in HTTP handler: const sender = await db.query('SELECT balance FROM accounts WHERE id = $1', [senderId]); if (!sender.rows[0]) return res.status(404).json({ error: 'Sender not found' }); if (sender.rows[0].balance < amount) return res.status(400).json({ error: 'Insufficient funds' }); // More business logic: const fraud = await checkFraud(senderId, amount); if (fraud.risk > 0.8) return res.status(403).json({ error: 'High risk transaction' }); // More business logic: await db.query('BEGIN'); await db.query('UPDATE accounts SET balance = balance - $1 WHERE id = $2', [amount, senderId]); await db.query('UPDATE accounts SET balance = balance + $1 WHERE id = $2', [amount, recipientId]); await db.query('INSERT INTO transactions VALUES ...', [...]); await db.query('COMMIT'); res.json({ status: 'accepted' }); });
Problems:
- Untestable: you need a running HTTP server and database to test the business logic
- Uncacheable: the business rules are coupled to the SQL and HTTP transport
- Unreusable: the same logic cannot be reused by a gRPC endpoint or a Kafka consumer
- Unrefactorable: changing the validation requires touching the route handler
Domain-Driven Design Core Concepts
DDD provides vocabulary and patterns for modeling complex business domains in code. For a payment system, the key concepts are:
Entities
Objects with identity that persists over time. An account is an entity — it has an ID, and its state changes.
typescript// modules/accounts/domain/Account.ts export class Account { #id: string; #balance: bigint; #status: 'active' | 'frozen' | 'closed'; #events: DomainEvent[] = []; private constructor(id: string, balance: bigint, status: 'active' | 'frozen' | 'closed') { this.#id = id; this.#balance = balance; this.#status = status; } static create(id: string, initialBalance: bigint): Account { const account = new Account(id, initialBalance, 'active'); account.#events.push(new AccountCreatedEvent(id, initialBalance)); return account; } static reconstitute(id: string, balance: bigint, status: 'active' | 'frozen' | 'closed'): Account { return new Account(id, balance, status); } debit(amount: bigint, reference: string): void { // Invariant enforcement — IN the domain object, not in application code if (this.#status !== 'active') { throw new AccountNotActiveError(this.#id, this.#status); } if (amount <= 0n) { throw new InvalidAmountError('Debit amount must be positive'); } if (this.#balance < amount) { throw new InsufficientFundsError(this.#id, this.#balance, amount); } this.#balance -= amount; this.#events.push(new AccountDebitedEvent(this.#id, amount, reference)); } credit(amount: bigint, reference: string): void { if (this.#status === 'closed') { throw new AccountClosedError(this.#id); } if (amount <= 0n) throw new InvalidAmountError('Credit amount must be positive'); this.#balance += amount; this.#events.push(new AccountCreditedEvent(this.#id, amount, reference)); } freeze(): void { if (this.#status !== 'active') throw new AccountNotActiveError(this.#id, this.#status); this.#status = 'frozen'; this.#events.push(new AccountFrozenEvent(this.#id)); } get id() { return this.#id; } get balance() { return this.#balance; } get status() { return this.#status; } get events() { return [...this.#events]; } clearEvents() { this.#events = []; } }
Why this matters for high-throughput systems:
The invariants are in the domain object and enforced before any I/O happens. At 50K payments/second, you want to reject invalid payments with zero database roundtrips. The Account.debit() method checks balance without touching the database — if it throws, the payment is rejected before any SQL runs.
Value Objects
Immutable objects without identity. An Amount is a value object — two amounts of 100 USD are identical regardless of which "instance" they are.
typescriptexport class Money { readonly amount: bigint; readonly currency: string; constructor(amount: bigint, currency: string) { if (amount < 0n) throw new Error('Money cannot be negative'); if (!['USD', 'INR', 'USDC', 'ETH'].includes(currency)) { throw new Error(`Unsupported currency: ${currency}`); } this.amount = amount; this.currency = currency; } add(other: Money): Money { if (other.currency !== this.currency) throw new Error('Currency mismatch'); return new Money(this.amount + other.amount, this.currency); } subtract(other: Money): Money { if (other.currency !== this.currency) throw new Error('Currency mismatch'); if (other.amount > this.amount) throw new InsufficientFundsError(); return new Money(this.amount - other.amount, this.currency); } equals(other: Money): boolean { return this.amount === other.amount && this.currency === other.currency; } }
Aggregates: The Consistency Boundary
An aggregate is a cluster of entities and value objects that must be changed together atomically. The aggregate root is the entry point — external code can only access the aggregate through the root.
For a UPI payment:
typescript// The Payment aggregate — entry point for all payment operations export class Payment { #id: string; #sender: Account; #recipient: Account; #amount: Money; #status: 'pending' | 'processing' | 'completed' | 'failed'; #events: DomainEvent[] = []; // Factory method — validates creation invariants static initiate(sender: Account, recipient: Account, amount: Money): Payment { if (sender.id === recipient.id) throw new SelfTransferError(); if (sender.status !== 'active') throw new AccountNotActiveError(sender.id, sender.status); if (sender.balance < amount.amount) throw new InsufficientFundsError(sender.id, sender.balance, amount.amount); const payment = new Payment(generateId(), sender, recipient, amount, 'pending'); payment.#events.push(new PaymentInitiatedEvent(payment.#id, sender.id, recipient.id, amount)); return payment; } process(): void { if (this.#status !== 'pending') throw new InvalidStateTransitionError('process', this.#status); this.#sender.debit(this.#amount.amount, this.#id); this.#recipient.credit(this.#amount.amount, this.#id); this.#status = 'completed'; this.#events.push(new PaymentCompletedEvent(this.#id)); } fail(reason: string): void { this.#status = 'failed'; this.#events.push(new PaymentFailedEvent(this.#id, reason)); } }
Clean Architecture: The Dependency Rule
Clean Architecture enforces that dependencies only point inward — from infrastructure to application to domain. The domain never knows about HTTP, Kafka, or PostgreSQL.
┌─────────────────────────────────────────┐
│ Infrastructure Layer │
│ (HTTP handlers, Kafka consumers, │
│ PostgreSQL repositories, Redis cache) │
│ │
│ ┌───────────────────────────────────┐ │
│ │ Application Layer │ │
│ │ (Use cases: ProcessPayment, │ │
│ │ GetAccountBalance, etc.) │ │
│ │ │ │
│ │ ┌─────────────────────────────┐ │ │
│ │ │ Domain Layer │ │ │
│ │ │ (Account, Payment, Money, │ │ │
│ │ │ domain events, errors) │ │ │
│ │ │ NO external dependencies │ │ │
│ │ └─────────────────────────────┘ │ │
│ └───────────────────────────────────┘ │
└─────────────────────────────────────────┘
Dependencies flow: Infrastructure → Application → Domain
Domain knows nothing about infrastructure.
Ports and Adapters (Hexagonal Architecture)
typescript// Domain layer: defines the PORT (interface) // ports/AccountRepository.ts export interface AccountRepository { findById(id: string): Promise<Account | null>; save(account: Account): Promise<void>; } // Application layer: uses the PORT, knows nothing about PostgreSQL // use-cases/ProcessPayment.ts export class ProcessPaymentUseCase { constructor( private accounts: AccountRepository, // injected via DI private events: DomainEventPublisher, ) {} async execute(senderId: string, recipientId: string, amount: Money): Promise<Payment> { const [sender, recipient] = await Promise.all([ this.accounts.findById(senderId), this.accounts.findById(recipientId), ]); if (!sender) throw new AccountNotFoundError(senderId); if (!recipient) throw new AccountNotFoundError(recipientId); const payment = Payment.initiate(sender, recipient, amount); payment.process(); await this.accounts.save(sender); // delegate to repository await this.accounts.save(recipient); // Publish domain events for (const event of payment.events) { await this.events.publish(event); } return payment; } }
typescript// Infrastructure layer: implements the PORT for PostgreSQL (the ADAPTER) // adapters/PostgresAccountRepository.ts import { Account } from '../../domain/Account'; import { AccountRepository } from '../../ports/AccountRepository'; export class PostgresAccountRepository implements AccountRepository { constructor(private pool: Pool) {} async findById(id: string): Promise<Account | null> { const { rows } = await this.pool.query( 'SELECT id, balance, status FROM accounts WHERE id = $1', [id] ); if (!rows[0]) return null; return Account.reconstitute(rows[0].id, BigInt(rows[0].balance), rows[0].status); } async save(account: Account): Promise<void> { await this.pool.query( 'UPDATE accounts SET balance = $1, status = $2 WHERE id = $3', [account.balance.toString(), account.status, account.id] ); } }
The transport-agnostic domain: the same ProcessPaymentUseCase works whether triggered by:
typescript// HTTP trigger fastify.post('/api/v2/payments', async (req) => { return useCase.execute(req.body.senderId, req.body.recipientId, new Money(...)); }); // Kafka consumer trigger consumer.run({ eachMessage: async ({ message }) => { const { senderId, recipientId, amount } = JSON.parse(message.value); return useCase.execute(senderId, recipientId, new Money(...)); }}); // gRPC trigger function processPayment(call, callback) { useCase.execute(call.request.senderId, ...) .then(result => callback(null, result)) .catch(err => callback(err)); }
The domain and application layers are untouched. Only the infrastructure adapter changes.
Dependency Injection: Wiring the Layers
typescript// composition-root/container.ts — wire everything together at startup import { Pool } from 'pg'; import { Kafka } from 'kafkajs'; import { PostgresAccountRepository } from '../adapters/PostgresAccountRepository'; import { KafkaEventPublisher } from '../adapters/KafkaEventPublisher'; import { ProcessPaymentUseCase } from '../use-cases/ProcessPaymentUseCase'; export function buildContainer() { const pool = new Pool({ connectionString: process.env.DATABASE_URL }); const kafka = new Kafka({ brokers: process.env.KAFKA_BROKERS!.split(',') }); const producer = kafka.producer(); const accountRepo = new PostgresAccountRepository(pool); const eventPublisher = new KafkaEventPublisher(producer); const processPayment = new ProcessPaymentUseCase(accountRepo, eventPublisher); return { processPayment, pool, producer }; }
Swap PostgreSQL for an in-memory repository for unit tests without touching any business logic:
typescript// tests/use-cases/ProcessPayment.test.ts import { InMemoryAccountRepository } from '../test-doubles/InMemoryAccountRepository'; import { SpyEventPublisher } from '../test-doubles/SpyEventPublisher'; describe('ProcessPaymentUseCase', () => { const accounts = new InMemoryAccountRepository(); const events = new SpyEventPublisher(); const useCase = new ProcessPaymentUseCase(accounts, events); beforeEach(() => { accounts.seed(Account.reconstitute('sender-1', 10000n, 'active')); accounts.seed(Account.reconstitute('recipient-1', 0n, 'active')); }); it('debits sender and credits recipient', async () => { await useCase.execute('sender-1', 'recipient-1', new Money(5000n, 'INR')); const sender = await accounts.findById('sender-1'); const recipient = await accounts.findById('recipient-1'); expect(sender!.balance).toBe(5000n); expect(recipient!.balance).toBe(5000n); }); it('rejects payment when balance is insufficient', async () => { await expect( useCase.execute('sender-1', 'recipient-1', new Money(20000n, 'INR')) ).rejects.toThrow(InsufficientFundsError); }); });
Tests run in milliseconds. No database. No network. Business logic tested in isolation.
CQRS: Separate Write and Read Models
Command Query Responsibility Segregation (CQRS) separates the model that handles writes (commands) from the model that handles reads (queries).
For a payment ledger:
- Write model: normalized, ACID-consistent. Enforces invariants. Optimized for integrity.
- Read model: denormalized, eventually consistent. Pre-computed aggregations. Optimized for query speed.
typescript// Write side: command handler (uses the rich domain model) class DebitAccountCommandHandler { async handle(cmd: DebitAccountCommand): Promise<void> { const account = await this.accountRepo.findById(cmd.accountId); account.debit(cmd.amount, cmd.reference); // invariant enforcement await this.accountRepo.save(account); await this.eventBus.publish(account.events); } } // Read side: query handler (uses simple, fast projections) class GetAccountSummaryQueryHandler { async handle(query: GetAccountSummaryQuery): Promise<AccountSummary> { // Read from pre-computed projection table — no domain model needed const { rows } = await this.readPool.query( `SELECT account_id, balance, total_debited_30d, total_credited_30d, transaction_count_30d, last_transaction_at FROM account_summaries WHERE account_id = $1`, [query.accountId] ); return rows[0]; } }
The read model is updated by an event consumer that processes domain events from Kafka:
typescript// Projection builder: maintains the read model consumer.run({ eachMessage: async ({ message }) => { const event = JSON.parse(message.value.toString()); if (event.type === 'ACCOUNT_DEBITED') { await readPool.query( `INSERT INTO account_summaries (account_id, balance, total_debited_30d, transaction_count_30d, last_transaction_at) VALUES ($1, -$2, $2, 1, NOW()) ON CONFLICT (account_id) DO UPDATE SET balance = account_summaries.balance - $2, total_debited_30d = account_summaries.total_debited_30d + $2, transaction_count_30d = account_summaries.transaction_count_30d + 1, last_transaction_at = NOW()`, [event.accountId, BigInt(event.amount)] ); } } });
CQRS benefits for high-throughput systems:
- Write and read databases can be scaled independently
- Read queries never compete with write transactions
- The read model can be optimized for specific query patterns (denormalized, indexed differently)
- If the read model becomes stale, you can rebuild it by replaying events from Kafka
Anti-Corruption Layer for Blockchain RPC
When your indexer calls a third-party blockchain RPC node (Ethereum's eth_getBlockByNumber, Supra's block API), the response structure is the external system's format. Letting that format leak into your domain creates coupling — if the RPC response format changes, your domain objects break.
typescript// External world: raw Ethereum RPC response interface EthRpcBlock { number: string; // hex string: "0x11A3E70" hash: string; // 0x-prefixed hex transactions: Array<{ hash: string; from: string; to: string | null; value: string; // hex string in wei gas: string; gasPrice: string; input: string; }>; } // Anti-corruption layer: translate to your domain model function translateEthBlock(raw: EthRpcBlock): Block { return new Block({ height: parseInt(raw.number, 16), // hex → number hash: Buffer.from(raw.hash.slice(2), 'hex'), // hex string → Buffer transactions: raw.transactions.map(tx => new Transaction({ hash: Buffer.from(tx.hash.slice(2), 'hex'), sender: tx.from, recipient: tx.to ?? null, amount: BigInt(tx.value), // hex string → BigInt gasUsed: parseInt(tx.gas, 16), })), }); }
The domain Block object uses your types (Buffer for hashes, BigInt for amounts). If Ethereum changes their API response format, you update translateEthBlock — nowhere else.
Production Incident: Domain Logic in a SQL Query
Context: A banking ledger service. Balance calculations were done in SQL via a stored procedure called by the application:
sql-- The stored procedure CREATE FUNCTION calculate_balance(p_account_id UUID) RETURNS NUMERIC AS $$ SELECT SUM(CASE WHEN transaction_type = 'CREDIT' THEN amount ELSE -amount END) FROM transactions WHERE account_id = p_account_id AND status = 'settled'; $$ LANGUAGE SQL;
What happened:
A regulatory requirement changed: transactions pending for more than 48 hours must be included in the balance calculation as "provisional debits." The business rule changed from:
Balance = sum of settled transactions
To:
Balance = sum of settled transactions + sum of pending debits older than 48 hours
The stored procedure was updated. But the update missed that pending credits older than 48 hours should also be included (oversight). The bug was in production for 11 days before a reconciliation audit caught it. 847 accounts had incorrect balance calculations.
Root cause: the business rule was in SQL, not in domain code. SQL is not testable with unit tests. The domain rule was invisible to developers who tested in isolation.
The fix — move the rule to the domain:
typescriptclass Account { calculateEffectiveBalance(asOf: Date): bigint { const PROVISIONAL_THRESHOLD_MS = 48 * 60 * 60 * 1000; return this.#transactions.reduce((balance, tx) => { if (tx.status === 'settled') { return tx.type === 'CREDIT' ? balance + tx.amount : balance - tx.amount; } // Provisional: pending transactions older than 48 hours if (tx.status === 'pending' && (asOf.getTime() - tx.createdAt.getTime()) > PROVISIONAL_THRESHOLD_MS) { return tx.type === 'CREDIT' ? balance + tx.amount : balance - tx.amount; } return balance; }, 0n); } } // Unit test catches the bug immediately it('includes 48-hour pending credits as provisional', () => { const account = buildAccount([ creditTx({ amount: 10000n, status: 'settled' }), creditTx({ amount: 5000n, status: 'pending', createdAt: daysAgo(3) }), // 72h old debitTx({ amount: 2000n, status: 'pending', createdAt: daysAgo(1) }), // 24h old, NOT included ]); expect(account.calculateEffectiveBalance(new Date())).toBe(15000n); // settled + 72h credit });
The test caught the exact bug — credits and debits treated consistently — before any code reached production.
Summary
| Concept | Key Takeaway |
|---|---|
| Entity | Object with identity that changes over time. Enforces its own invariants. |
| Value Object | Immutable, identity-less. Money(5000, 'INR') is always equal to another Money(5000, 'INR'). |
| Aggregate | Cluster of entities with one root. Changed atomically. Emits domain events. |
| Invariant | Rule the domain enforces. Never in SQL. Always in domain code. Testable without I/O. |
| Clean Architecture | Domain → Application → Infrastructure. Dependencies flow inward only. |
| Ports and Adapters | Port = interface the domain defines. Adapter = infrastructure implementation. Swappable. |
| DI container | Wire adapters to ports at startup. Swap for test doubles in tests. |
| CQRS | Write model: normalized, strict, ACID. Read model: denormalized, eventual, fast. |
| Anti-corruption layer | Translate external formats at the boundary. External changes never reach domain objects. |
| Domain events | Emitted by aggregates after state changes. Published to Kafka by the infrastructure layer. |
The architecture is clean. Module 12 covers how to keep it healthy under production load — CPU profiling, flame graph reading, ELU monitoring, and the full clinic.js diagnostic workflow for Node.js services under high-throughput stress.
Next: Module 12 — Production Observability, Performance Profiling & Flame Graphs →