When You Actually Need Sharding
Let me be clear: sharding is a last resort. It adds enormous complexity to your application, makes cross-entity queries painful, and turns simple operations like "give me all transactions from last month" into distributed coordination problems. Before you shard, exhaust these options first:
- Vertical scaling — bigger machine, more RAM, faster NVMe. This gets you surprisingly far.
- Read replicas — offload reporting and analytics to replicas. We covered this in a previous article.
- Table partitioning — range-partition by
created_atto speed up time-bounded queries and VACUUM. - Connection pooling — PgBouncer in transaction mode can 10x your effective connection capacity.
We did all of these. They bought us about 18 months. But when your write volume consistently exceeds what a single primary can handle — for us, that was around 50K TPS with our transaction schema — you need to split the writes across multiple databases.
Choosing a Shard Key
This is the single most important decision you'll make. Get it wrong and you'll either have hot shards (all traffic hitting one node) or impossible cross-shard queries. For payment data, there are three realistic options:
Option 1: merchant_id
This is what we chose. Most payment operations are scoped to a single merchant: create a charge, list transactions, generate a settlement report. With merchant_id as the shard key, all of a merchant's data lives on one shard, which means these operations never need cross-shard coordination.
The downside is whale merchants. If one merchant does 40% of your volume, their shard becomes a hot spot. We handle this with a separate "whale tier" — large merchants get their own dedicated shard.
Option 2: transaction_id
Distributes writes perfectly evenly (assuming UUIDs or snowflake IDs). But now every merchant-scoped query — "show me all transactions for merchant X" — becomes a scatter-gather across all shards. For a payment platform where merchants constantly check their dashboards, this is a non-starter.
Option 3: Composite (merchant_id + date range)
Shard by merchant, sub-partition by month within each shard. This gives you the best of both worlds but doubles your operational complexity. We considered this but decided the juice wasn't worth the squeeze at our scale.
Application-Level Routing in Go
We went with application-level sharding rather than middleware like Vitess or Citus. The reasoning: we wanted full control over shard routing logic, especially for handling whale merchants and gradual migration. Here's the core routing logic:
type ShardRouter struct {
shards []*sql.DB
numShards uint32
overrides map[string]int // whale merchant -> dedicated shard
}
func NewShardRouter(dsns []string, overrides map[string]int) (*ShardRouter, error) {
shards := make([]*sql.DB, len(dsns))
for i, dsn := range dsns {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, fmt.Errorf("shard %d: %w", i, err)
}
shards[i] = db
}
return &ShardRouter{
shards: shards,
numShards: uint32(len(dsns)),
overrides: overrides,
}, nil
}
func (r *ShardRouter) ShardFor(merchantID string) *sql.DB {
// Check whale overrides first
if idx, ok := r.overrides[merchantID]; ok {
return r.shards[idx]
}
// Consistent hash for everyone else
h := fnv.New32a()
h.Write([]byte(merchantID))
idx := h.Sum32() % r.numShards
return r.shards[idx]
}
func (r *ShardRouter) CreateTransaction(ctx context.Context, tx Transaction) error {
db := r.ShardFor(tx.MerchantID)
_, err := db.ExecContext(ctx,
`INSERT INTO transactions (id, merchant_id, amount, currency, status, created_at)
VALUES ($1, $2, $3, $4, $5, $6)`,
tx.ID, tx.MerchantID, tx.Amount, tx.Currency, tx.Status, tx.CreatedAt,
)
return err
}
% num_shards
Why FNV32 over SHA256? For shard routing, you need speed, not cryptographic security. FNV32a hashes a UUID in ~15ns vs ~200ns for SHA256. When you're routing 200K requests per second, that difference matters.
The Cross-Shard Reporting Problem
The moment you shard, your finance team's favorite query — "total settlement amount across all merchants for March" — becomes a distributed aggregation. We solved this with a dedicated read-only aggregation layer:
- CDC (Change Data Capture) streams from each shard using Debezium, publishing transaction events to Kafka.
- An aggregation service consumes these events and writes denormalized summaries into a single ClickHouse instance.
- Reporting queries hit ClickHouse, not the sharded PostgreSQL instances.
This adds latency — reports are eventually consistent, typically 5-15 seconds behind real-time. For settlement reports that run nightly, this is perfectly fine. For real-time merchant dashboards, we query the specific merchant's shard directly (which is fast, since all their data is co-located).
Rebalancing When a Whale Shows Up
Three months after launch, a marketplace merchant onboarded and immediately accounted for 30% of our transaction volume. Their shard's CPU was pegged at 90% while the other three sat at 25%. We needed to move them to a dedicated shard without downtime.
The process:
- Provision a new shard (shard 4) and add it to the router config with zero merchants assigned.
- Enable dual-write: new transactions for the whale merchant go to both the old shard and shard 4.
- Backfill historical data from the old shard to shard 4 using a background migration job.
- Once backfill catches up and dual-write has been running for 24 hours, flip the override:
overrides["whale_merchant_id"] = 4. - Stop dual-write. Archive the whale's data from the old shard after a grace period.
Warning: Never delete data from the source shard until you've verified every single row exists on the destination. We checksummed row counts and amount totals per day before cutting over. One off-by-one in the backfill date range would have meant lost transactions.
The Zero-Downtime Migration
Moving from a single database to four shards while processing live payments was the most stressful week of my career. Here's the playbook:
- Phase 1: Shadow writes. Application writes to the original DB as primary. A background goroutine also writes to the target shard. We compared results but didn't rely on the shards yet.
- Phase 2: Dual reads. Read from both the original DB and the shard. Log any discrepancies. This ran for 72 hours with zero mismatches before we trusted it.
- Phase 3: Flip primary. Shards become the primary write target. Original DB receives shadow writes as a safety net.
- Phase 4: Decommission. After two weeks with shards as primary and zero issues, we stopped writing to the original DB.
The whole migration took about 6 weeks from first shadow write to full decommission. We processed over $2B in transactions during that period without a single data inconsistency.
Lessons Learned
- Start with fewer shards than you think you need. We started with 4. Adding shards is easier than managing too many from day one. Each shard is another database to monitor, back up, and maintain.
- Build shard-aware tooling early. Your existing admin scripts, migration tools, and monitoring dashboards all assume a single database. Budget time to update them.
- Test with production-like data distribution. Our staging environment had evenly distributed merchants. Production had a power-law distribution. The hot shard problem only showed up in prod.
- Keep a global ID registry. We maintain a lightweight table mapping
transaction_id → shard_idin a separate small database. This lets us look up any transaction without knowing which merchant it belongs to — essential for support tickets and dispute resolution.
References
- PostgreSQL Documentation — Table Partitioning
- Vitess Documentation — Database Clustering for Horizontal Scaling
- Citus — Distributed PostgreSQL Extension
- Designing Data-Intensive Applications — Martin Kleppmann (O'Reilly)
- Debezium Documentation — Change Data Capture
Disclaimer: This article reflects the author's personal experience and opinions. Product names, logos, and brands are property of their respective owners. Architecture decisions depend heavily on your specific workload — always benchmark with your own data before committing to a sharding strategy.