Scaling¶
Guide to scaling projection processing across multiple workers.
Table of Contents¶
- Horizontal Scaling
- Partitioning Strategy
- Running Multiple Projections
- Performance Tuning
- Production Patterns
- Advanced Topics
Horizontal Scaling¶
Scale projection processing by distributing work across multiple workers.
When to Scale¶
Indicators: - Projection lag increasing over time - Event processing unable to keep pace with write rate - Single worker resource constraints (CPU/memory) - High-latency operations in projection handlers
When Not to Scale: - Stable lag with adequate headroom - Low event volume (< 100 events/second) - Fast projection logic (< 1ms per event) - Single worker not yet optimized (tune batch size first)
Example Calculation:
System generating 10,000 events/hour with per-event cost:
- Database insert: 2ms
- External API call: 50ms
- Cache invalidation: 1ms
- Total: ~53ms per event
Throughput: ~18.9 events/second per worker Required capacity: 2.78 events/second (10,000/hour) Workers needed: 1 (with headroom)
For 100,000 events/hour: Need 2-3 workers minimum, use 4 for margin.
Architecture¶
Event Stream → Partition Assignment → Worker Pool
Event A → Partition 0 → Worker 0
Event B → Partition 1 → Worker 1
Event C → Partition 2 → Worker 2
Event D → Partition 3 → Worker 3
Mechanism:
1. Events assigned to partitions via hash(aggregate_id)
2. Each worker processes its assigned partition(s)
3. Workers operate independently without coordination
4. Single checkpoint per projection (not per partition)
Hash-Based Partitioning¶
config := projection.DefaultProcessorConfig()
config.PartitionKey = 0 // This worker (0-indexed)
config.TotalPartitions = 4 // Total worker count
processor := postgres.NewProcessor(db, store, &config)
Characteristics: - Deterministic assignment: same aggregate → same partition - Even distribution across workers - No coordination overhead - Maintains per-aggregate ordering
Ordering Guarantees¶
✅ Within Aggregate - Events for same aggregate processed in order ✅ Deterministic Assignment - Same aggregate always routes to same partition ✅ Even Distribution - Approximately equal load per partition
❌ Cross-Aggregate - No global ordering between different aggregates
Scaling Patterns¶
Pattern 1: Separate Processes¶
Run the same binary multiple times with different partition keys:
# Terminal 1
PARTITION_KEY=0 TOTAL_PARTITIONS=4 ./myapp process-projections
# Terminal 2
PARTITION_KEY=1 TOTAL_PARTITIONS=4 ./myapp process-projections
# Terminal 3
PARTITION_KEY=2 TOTAL_PARTITIONS=4 ./myapp process-projections
# Terminal 4
PARTITION_KEY=3 TOTAL_PARTITIONS=4 ./myapp process-projections
See partitioned example for details.
Pattern 2: Worker Pool (Single Process)¶
Run multiple partitions in the same process using goroutines:
import "github.com/getpup/pupsourcing/es/projection/runner"
store := postgres.NewStore(postgres.DefaultStoreConfig())
// Create 4 processors with different partition keys
runners := make([]runner.ProjectionRunner, 4)
for i := 0; i < 4; i++ {
config := projection.DefaultProcessorConfig()
config.PartitionKey = i
config.TotalPartitions = 4
processor := postgres.NewProcessor(db, store, &config)
runners[i] = runner.ProjectionRunner{
Projection: projection,
Processor: processor,
}
}
// Run all partitions concurrently
r := runner.New()
err := r.Run(ctx, runners)
⚠️ Thread Safety Warning: When using worker pools, all workers share the same projection instance. If your projection maintains state, it MUST be thread-safe:
// ✅ Good: Thread-safe using atomic operations
type SafeProjection struct {
count int64
}
func (p *SafeProjection) Handle(ctx context.Context, event es.PersistedEvent) error {
atomic.AddInt64(&p.count, 1) // Thread-safe
return nil
}
// ✅ Good: Stateless (only updates database)
type StatelessProjection struct{}
func (p *StatelessProjection) Handle(ctx context.Context, event es.PersistedEvent) error {
_, err := tx.ExecContext(ctx, "INSERT INTO read_model ...") // Database handles concurrency
return err
}
// ❌ Bad: Not thread-safe
type UnsafeProjection struct {
count int // Race condition!
}
func (p *UnsafeProjection) Handle(ctx context.Context, event es.PersistedEvent) error {
p.count++ // NOT thread-safe!
return nil
}
See worker-pool example for details.
When to Use Each Pattern¶
| Pattern | Best For | Pros | Cons |
|---|---|---|---|
| Single Worker | < 1K events/sec | Simple | Limited throughput |
| Worker Pool | 2-8 partitions, one machine | Easy deployment | Single point of failure |
| Separate Processes | > 8 partitions, multiple machines | Better isolation | More complex deployment |
Partitioning Strategy¶
Default: HashPartitionStrategy¶
Uses FNV-1a hash for deterministic, even distribution:
type HashPartitionStrategy struct{}
func (HashPartitionStrategy) ShouldProcess(aggregateID string, partitionKey, totalPartitions int) bool {
if totalPartitions <= 1 {
return true
}
h := fnv.New32a()
h.Write([]byte(aggregateID))
partition := int(h.Sum32()) % totalPartitions
return partition == partitionKey
}
Custom Partitioning¶
Implement PartitionStrategy for custom logic:
type RegionPartitionStrategy struct {
region string
}
func (s RegionPartitionStrategy) ShouldProcess(aggregateID string, partitionKey, totalPartitions int) bool {
// Custom logic - e.g., based on aggregate prefix
// "us-" prefix goes to partition 0
// "eu-" prefix goes to partition 1
if strings.HasPrefix(aggregateID, "us-") {
return partitionKey == 0
} else if strings.HasPrefix(aggregateID, "eu-") {
return partitionKey == 1
}
// Default to hash for others
return HashPartitionStrategy{}.ShouldProcess(aggregateID, partitionKey, totalPartitions)
}
Running Multiple Projections¶
Pattern 1: Separate Processes¶
Run each projection in its own process for better isolation:
# Process 1
./myapp projection --name=user_counter
# Process 2
./myapp projection --name=analytics
# Process 3
./myapp projection --name=order_summary
Pattern 2: Same Process¶
Run multiple projections in the same process:
import "github.com/getpup/pupsourcing/es/projection/runner"
store := postgres.NewStore(postgres.DefaultStoreConfig())
// Create a single processor - it can be reused for multiple projections
// since processors are stateless orchestrators
config := projection.DefaultProcessorConfig()
processor := postgres.NewProcessor(db, store, &config)
r := runner.New()
err := r.Run(ctx, []runner.ProjectionRunner{
{Projection: &UserCounterProjection{}, Processor: processor},
{Projection: &EmailSenderProjection{}, Processor: processor},
{Projection: &AnalyticsProjection{}, Processor: processor},
})
See multiple-projections example for details.
Trade-offs¶
| Approach | Pros | Cons |
|---|---|---|
| Separate Processes | Better isolation, independent scaling | More processes to manage |
| Same Process | Simpler deployment, shared resources | One failure affects all |
When to Mix¶
You can run: - Fast projections together - Slow projections in separate processes with their own partitioning - Critical projections isolated from non-critical ones
Performance Tuning¶
Batch Size¶
Controls how many events are processed per transaction:
config := projection.DefaultProcessorConfig()
config.BatchSize = 100 // Default
// Larger batches: better throughput, higher latency
config.BatchSize = 1000
// Smaller batches: lower latency, more transactions
config.BatchSize = 10
Guidelines: - Fast projections: 500-1000 - Slow projections: 10-50 - Default (100) works for most cases
Connection Pooling¶
Configure database connection pool:
db, _ := sql.Open("postgres", connStr)
db.SetMaxOpenConns(25) // Limit concurrent connections
db.SetMaxIdleConns(5) // Idle connections to keep
db.SetConnMaxLifetime(5 * time.Minute)
Checkpoint Frequency¶
Checkpoint is updated after each batch. To reduce checkpoint writes:
// Process more events per checkpoint
config.BatchSize = 500
// But consider: larger batches = more reprocessing on crash
Monitoring¶
Track these metrics:
-- Projection lag
SELECT
projection_name,
(SELECT MAX(global_position) FROM events) - last_global_position as lag
FROM projection_checkpoints;
-- Processing rate
SELECT
projection_name,
last_global_position,
updated_at
FROM projection_checkpoints
ORDER BY updated_at DESC;
Production Patterns¶
Pattern 1: Gradual Scaling¶
Start with 1 worker, scale up as needed:
Day 1: 1 worker (handles 100%)
Day 5: 2 workers (each handles ~50%)
Day 10: 4 workers (each handles ~25%)
Day 30: 8 workers (each handles ~12.5%)
See scaling example for a demonstration.
Pattern 2: Projection Prioritization¶
Run critical projections with more resources:
store := postgres.NewStore(postgres.DefaultStoreConfig())
// Critical: User data (4 workers)
userRunners := make([]runner.ProjectionRunner, 4)
for i := 0; i < 4; i++ {
config := projection.DefaultProcessorConfig()
config.PartitionKey = i
config.TotalPartitions = 4
processor := postgres.NewProcessor(db, store, &config)
userRunners[i] = runner.ProjectionRunner{
Projection: &UserProjection{},
Processor: processor,
}
}
// Normal: Analytics (2 workers, separate process)
analyticsRunners := make([]runner.ProjectionRunner, 2)
for i := 0; i < 2; i++ {
config := projection.DefaultProcessorConfig()
config.PartitionKey = i
config.TotalPartitions = 2
processor := postgres.NewProcessor(db, store, &config)
analyticsRunners[i] = runner.ProjectionRunner{
Projection: &AnalyticsProjection{},
Processor: processor,
}
}
// Low priority: Reports (1 worker, best-effort)
config := projection.DefaultProcessorConfig()
processor := postgres.NewProcessor(db, store, &config)
processor.Run(ctx, &ReportProjection{})
Pattern 3: Hot/Cold Separation¶
Process recent events quickly, older events more slowly:
// Hot path: Recent events (small batches, low latency)
hotConfig := projection.DefaultProcessorConfig()
hotConfig.BatchSize = 10
hotProcessor := postgres.NewProcessor(db, store, &hotConfig)
// Cold path: Historical events (large batches, high throughput)
coldConfig := projection.DefaultProcessorConfig()
coldConfig.BatchSize = 1000
coldProcessor := postgres.NewProcessor(db, store, &coldConfig)
Pattern 4: Idempotent Projections¶
Always make projections idempotent to handle reprocessing:
// ✅ Good: Idempotent insert
_, err := tx.ExecContext(ctx,
"INSERT INTO users (id, email) VALUES ($1, $2)"+
"ON CONFLICT (id) DO UPDATE SET email = EXCLUDED.email",
userID, email)
// ❌ Bad: Non-idempotent increment
_, err := tx.ExecContext(ctx,
"UPDATE counters SET count = count + 1")
// ✅ Better: Track processed events
_, err := tx.ExecContext(ctx,
"INSERT INTO processed_events (event_id) VALUES ($1)"+
"ON CONFLICT (event_id) DO NOTHING",
eventID)
Advanced Topics¶
Projection Rebuilding¶
To rebuild a projection from scratch:
-- 1. Delete checkpoint
DELETE FROM projection_checkpoints WHERE projection_name = 'my_projection';
-- 2. Clear read model
TRUNCATE TABLE my_read_model;
-- 3. Restart projection - it will reprocess all events
Snapshot Support¶
For long-lived aggregates, consider snapshots:
// Read from snapshot position
snapshotVersion := int64(1000)
recentEvents, err := store.ReadAggregateStream(ctx, tx, "Identity", "User", aggregateID, &snapshotVersion, nil)
Error Handling¶
func (p *MyProjection) Handle(ctx context.Context, event es.PersistedEvent) error {
// Transient errors: return error to retry
if err := someOperation(); err != nil {
return fmt.Errorf("transient error: %w", err)
}
// Permanent errors: log and skip
if err := validate(event); err != nil {
log.Printf("Invalid event %s: %v", event.EventID, err)
return nil // Skip this event
}
return nil
}
Database Partitioning by Bounded Context¶
For high-volume systems with multiple bounded contexts, you can leverage PostgreSQL's table partitioning to improve query performance and data management. This is particularly useful when different contexts have different scaling characteristics or retention policies.
Why Partition by Bounded Context?¶
Benefits: - Improved Query Performance: Queries filtering by bounded context only scan relevant partitions - Independent Scaling: Different contexts can have different storage strategies - Flexible Retention: Apply different retention policies per context (e.g., keep Identity events longer than Analytics) - Maintenance Efficiency: Backup, restore, or vacuum individual contexts independently - Clear Data Boundaries: Physical separation reinforces logical domain boundaries
Example Scenario: - Identity context: 1M events/day, retain 7 years for compliance - Billing context: 500K events/day, retain 10 years for audit - Analytics context: 5M events/day, retain 90 days for operational insights
PostgreSQL List Partitioning by Bounded Context¶
Create partitions for each bounded context:
-- 1. Create partitioned events table (using your existing schema)
CREATE TABLE events (
-- ... all your event columns here ...
PRIMARY KEY (bounded_context, aggregate_type, aggregate_id, aggregate_version)
) PARTITION BY LIST (bounded_context);
-- 2. Create partition for Identity context
CREATE TABLE events_identity PARTITION OF events
FOR VALUES IN ('Identity');
-- 3. Create partition for Billing context
CREATE TABLE events_billing PARTITION OF events
FOR VALUES IN ('Billing');
-- 4. Create partition for Analytics context
CREATE TABLE events_analytics PARTITION OF events
FOR VALUES IN ('Analytics');
-- Add indexes per partition for query performance
CREATE INDEX idx_events_identity_global_position ON events_identity (global_position);
CREATE INDEX idx_events_billing_global_position ON events_billing (global_position);
CREATE INDEX idx_events_analytics_global_position ON events_analytics (global_position);
-- 5. Partition aggregate_heads similarly
CREATE TABLE aggregate_heads (
bounded_context TEXT NOT NULL,
aggregate_type TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
aggregate_version BIGINT NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (bounded_context, aggregate_type, aggregate_id)
) PARTITION BY LIST (bounded_context);
CREATE TABLE aggregate_heads_identity PARTITION OF aggregate_heads
FOR VALUES IN ('Identity');
CREATE TABLE aggregate_heads_billing PARTITION OF aggregate_heads
FOR VALUES IN ('Billing');
CREATE TABLE aggregate_heads_analytics PARTITION OF aggregate_heads
FOR VALUES IN ('Analytics');
Query Performance Improvement:
-- Without partitioning: Full table scan
SELECT * FROM events WHERE bounded_context = 'Identity' AND aggregate_type = 'User';
-- With partitioning: Only scans events_identity partition
SELECT * FROM events WHERE bounded_context = 'Identity' AND aggregate_type = 'User';
-- PostgreSQL automatically routes to events_identity partition
Adding New Bounded Contexts¶
When adding a new bounded context, create its partition:
-- Add new "Catalog" context
CREATE TABLE events_catalog PARTITION OF events
FOR VALUES IN ('Catalog');
CREATE INDEX idx_events_catalog_global_position
ON events_catalog (global_position);
CREATE INDEX idx_events_catalog_aggregate
ON events_catalog (aggregate_type, aggregate_id);
CREATE INDEX idx_events_catalog_created_at
ON events_catalog (created_at);
CREATE TABLE aggregate_heads_catalog PARTITION OF aggregate_heads
FOR VALUES IN ('Catalog');
⚠️ Important: You must create a partition before writing events to that bounded context, otherwise PostgreSQL will reject the insert.
Hash Partitioning with Subpartitions¶
For contexts with extremely high volume, combine bounded context partitioning with hash-based subpartitioning:
-- Events table partitioned by bounded context
CREATE TABLE events (
-- columns as before
PRIMARY KEY (bounded_context, aggregate_type, aggregate_id, aggregate_version)
) PARTITION BY LIST (bounded_context);
-- Identity partition with hash subpartitions on aggregate_type
CREATE TABLE events_identity PARTITION OF events
FOR VALUES IN ('Identity')
PARTITION BY HASH (aggregate_type);
CREATE TABLE events_identity_0 PARTITION OF events_identity
FOR VALUES WITH (MODULUS 4, REMAINDER 0);
CREATE TABLE events_identity_1 PARTITION OF events_identity
FOR VALUES WITH (MODULUS 4, REMAINDER 1);
CREATE TABLE events_identity_2 PARTITION OF events_identity
FOR VALUES WITH (MODULUS 4, REMAINDER 2);
CREATE TABLE events_identity_3 PARTITION OF events_identity
FOR VALUES WITH (MODULUS 4, REMAINDER 3);
-- Billing partition without subpartitioning (lower volume)
CREATE TABLE events_billing PARTITION OF events
FOR VALUES IN ('Billing');
When to use subpartitions: - Query latency on individual bounded context partitions becomes unacceptable for your use case - Write throughput to a single context partition approaches database limits - Multiple high-volume aggregate types within the same context would benefit from physical separation - Different aggregate types have significantly different access patterns or retention needs
Multi-Column Hash Partitioning¶
For even distribution across multiple dimensions:
-- Composite hash on (bounded_context, aggregate_type, aggregate_id)
CREATE TABLE events (
-- columns as before
) PARTITION BY HASH (bounded_context, aggregate_type, aggregate_id);
-- PostgreSQL automatically distributes data across hash partitions
-- Create 16 partitions for even distribution using a loop
DO $$
BEGIN
FOR i IN 0..15 LOOP
EXECUTE format('CREATE TABLE events_%s PARTITION OF events FOR VALUES WITH (MODULUS 16, REMAINDER %s)',
LPAD(i::text, 2, '0'), i);
END LOOP;
END $$;
Benefit: Automatic load balancing across partitions without managing per-context partitions.
Trade-off: Lose ability to manage contexts independently (e.g., different retention policies).
Migration Strategy¶
To migrate from non-partitioned to partitioned tables:
-- 1. Create new partitioned table
CREATE TABLE events_partitioned (
-- same columns as events
) PARTITION BY LIST (bounded_context);
-- 2. Create partitions for each context
-- (as shown above)
-- 3. Copy data in batches
INSERT INTO events_partitioned
SELECT * FROM events
WHERE bounded_context = 'Identity'
LIMIT 100000;
-- Repeat until all Identity events migrated
-- Then repeat for other contexts
-- 4. Swap tables (in transaction)
BEGIN;
ALTER TABLE events RENAME TO events_old;
ALTER TABLE events_partitioned RENAME TO events;
COMMIT;
-- 5. Verify and drop old table
DROP TABLE events_old;
Partition Maintenance¶
Detach old partitions for archival:
-- Detach Analytics partition for archival (after 90 days)
ALTER TABLE events DETACH PARTITION events_analytics;
-- Archive to cold storage
pg_dump -t events_analytics > analytics_archive.sql
-- Drop or keep as standalone table
DROP TABLE events_analytics;
Attach archived partition for historical queries:
-- Restore from archive
psql -f analytics_archive.sql
-- Reattach
ALTER TABLE events ATTACH PARTITION events_analytics
FOR VALUES IN ('Analytics');
Application Code Considerations¶
No changes required in pupsourcing code - partitioning is transparent:
// Same code works with or without partitioning
events := []es.Event{
{
BoundedContext: "Identity", // Routes to events_identity partition
AggregateType: "User",
AggregateID: userID,
// ...
},
}
result, err := store.Append(ctx, tx, es.NoStream(), events)
For ScopedProjection filtering:
// Projection automatically benefits from partition pruning
func (p *UserReadModel) BoundedContexts() []string {
return []string{"Identity"} // PostgreSQL only scans events_identity
}
func (p *UserReadModel) AggregateTypes() []string {
return []string{"User"}
}
Monitoring Partition Performance¶
-- Check partition sizes
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_tables
WHERE tablename LIKE 'events_%'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
-- Verify partition pruning in queries
EXPLAIN (ANALYZE, BUFFERS)
SELECT * FROM events
WHERE bounded_context = 'Identity' AND aggregate_type = 'User';
-- Should show "Partitions removed: N" in output
Best Practices¶
- Plan contexts upfront: Adding partitions is easy, but reorganizing is expensive
- Create partitions before use: Inserts fail if partition doesn't exist
- Index each partition separately: Indexes are not inherited automatically
- Monitor partition sizes: Rebalance if one partition grows disproportionately
- Use consistent naming:
events_{context_name_lower}for clarity - Document retention policies: Make context-specific rules explicit
- Test migrations: Always test partition changes on staging first
When NOT to Partition¶
- Few contexts (<3): Overhead outweighs benefits
- Low volume (<10M events total): Partitioning adds complexity without gains
- Uniform access patterns: If all queries scan all contexts, partitioning doesn't help
- Small team: Operational complexity may not be worth it
See Also¶
- Getting Started - Basic setup
- Scaling Example - Dynamic scaling demonstration
- Deployment Guide - Production deployment patterns
- Industry Alignment - Comparison with other systems