Deployment & Operations Guide¶
This guide covers running pupsourcing workers in production — from a minimal setup to Kubernetes deployments.
Table of Contents¶
- Prerequisites
- Minimal Production Setup
- Database Connection Pooling
- Scaling
- Dispatcher Configuration
- Docker Compose Example
- Kubernetes Example
- Operational Checklist
- Troubleshooting
Prerequisites¶
Before deploying:
- Database migrations applied — both the event store tables and the worker tables
- Consumers implemented — at least one type satisfying
consumer.Consumerorconsumer.ScopedConsumer - Worker configuration decided — dispatcher strategy, batch size, and connection pool limits
- PostgreSQL 12+ accessible from all worker instances
Minimal Production Setup¶
A complete main() for a production worker process:
package main
import (
"context"
"database/sql"
"log"
"os"
"os/signal"
"syscall"
"time"
_ "github.com/lib/pq"
"github.com/pupsourcing/store/consumer"
storepostgres "github.com/pupsourcing/store/postgres"
"github.com/pupsourcing/worker"
)
func main() {
connStr := os.Getenv("DATABASE_URL")
if connStr == "" {
log.Fatal("DATABASE_URL is required")
}
db, err := sql.Open("postgres", connStr)
if err != nil {
log.Fatal(err)
}
defer db.Close()
// Connection pool settings
db.SetMaxOpenConns(15)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(30 * time.Minute)
db.SetConnMaxIdleTime(5 * time.Minute)
// Event store
eventStore := storepostgres.NewStore(storepostgres.DefaultStoreConfig())
// Register consumers
consumers := []consumer.Consumer{
&OrderProjection{},
&InventoryProjection{},
&NotificationConsumer{},
}
// Create worker with production options
w := worker.New(db, eventStore, consumers,
worker.WithBatchSize(200),
worker.WithPollInterval(500*time.Millisecond),
worker.WithMaxPollInterval(15*time.Second),
worker.WithBatchPause(100*time.Millisecond),
worker.WithHeartbeatInterval(5*time.Second),
worker.WithHeartbeatTimeout(30*time.Second),
worker.WithMaxConsecutiveFailures(10),
)
// Graceful shutdown on SIGTERM/SIGINT
ctx, cancel := signal.NotifyContext(context.Background(),
syscall.SIGTERM, syscall.SIGINT)
defer cancel()
log.Printf("worker %s starting", w.ID())
if err := w.Start(ctx); err != nil {
log.Fatalf("worker exited with error: %v", err)
}
log.Println("worker stopped gracefully")
}
Run the same binary across multiple instances for horizontal scaling. No instance-specific configuration is needed — each worker self-registers and the leader assigns consumers automatically.
Database Connection Pooling¶
Properly sized connection pools prevent exhaustion without wasting resources.
Recommended Settings¶
db.SetMaxOpenConns(15)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(30 * time.Minute)
db.SetConnMaxIdleTime(5 * time.Minute)
Why These Values¶
-
MaxOpenConns(15): Each worker runs concurrent operations — heartbeat updates, leader operations, and one processing transaction per assigned consumer. 15 connections provides headroom without exhausting the server. With 4 worker replicas × 15 = 60 connections, which fits within PostgreSQL's default
max_connections=100. -
MaxIdleConns(5): Keeps a warm pool of ready connections for burst traffic without hoarding connections during quiet periods.
-
ConnMaxLifetime(30min): Forces periodic connection rotation. This helps with DNS-based failover, load balancer draining, and prevents issues with long-lived connections.
-
ConnMaxIdleTime(5min): Reclaims connections idle for 5 minutes, preventing stale connections from accumulating.
Sizing Formula¶
max_connections >= (worker_replicas × max_open_conns) + admin_headroom
For example, with 4 workers and MaxOpenConns(15):
max_connections >= (4 × 15) + 10 = 70
Warning
If using the notify dispatcher strategy, each worker opens one additional dedicated connection for LISTEN that is not drawn from the database/sql pool. Factor this into your max_connections calculation.
Scaling¶
Horizontal Scaling¶
Scaling is automatic. Deploy more worker instances and the leader rebalances consumers:
- New worker registers in
worker_nodes - Leader detects the new worker on its next rebalance cycle (~5s)
- Leader computes new round-robin assignment
- Workers sync assignments and start/stop consumers
Scaling down is equally automatic: when a worker stops, the leader detects the missing heartbeat after HeartbeatTimeout (default 30s) and reassigns its consumers.
Distribution Table¶
With 6 consumers (Analytics, Billing, Email, Inventory, Orders, Shipping):
| Workers | Distribution |
|---|---|
| 1 | All 6 on worker-1 |
| 2 | 3 + 3 (round-robin) |
| 3 | 2 + 2 + 2 |
| 4 | 2 + 2 + 1 + 1 |
| 6 | 1 per worker |
| 7+ | 6 active, extras are hot standbys |
Scaling Guidelines¶
- Start with 1 worker and scale based on observed consumer lag
- Each consumer runs on exactly one worker at a time — no duplicate processing
- More workers than consumers means some workers sit idle as standbys
- All instances run identical binaries with identical configuration
Dispatcher Configuration¶
Choose a dispatcher strategy based on your infrastructure.
Poll Strategy (Default)¶
w := worker.New(db, eventStore, consumers,
worker.WithDispatcherStrategy(worker.DispatcherStrategyPoll),
worker.WithDispatcherInterval(200*time.Millisecond),
)
- Checks
MAX(global_position)every 200ms - Works everywhere — any PostgreSQL setup, including PgBouncer in transaction pooling mode
- Latency floor: ~200ms (configurable via
DispatcherInterval)
Notify Strategy (Low Latency)¶
w := worker.New(db, eventStore, consumers,
worker.WithDispatcherStrategy(worker.DispatcherStrategyNotify),
worker.WithNotifyConnectionString(connStr),
worker.WithNotifyChannel("app_events"),
)
- Uses PostgreSQL
LISTEN/NOTIFYfor sub-millisecond wakeup - Falls back to 1-second reconciliation polling
- Requires a dedicated connection to PostgreSQL (not through PgBouncer in transaction mode)
Warning
Two things must match for notify to work:
- The store must be configured to
NOTIFYon the same channel when appending events WithNotifyConnectionStringmust point directly to PostgreSQL —LISTENis session-scoped and incompatible with PgBouncer transaction pooling
Decision Guide¶
| Criteria | Poll | Notify |
|---|---|---|
| PgBouncer compatible | ✅ Yes | ❌ Transaction mode breaks it |
| Latency | ~200ms | Sub-millisecond |
| Extra connections | None | 1 per worker |
| Complexity | Simple | Slightly more setup |
| Recommended for | Most deployments | Latency-sensitive workloads |
Docker Compose Example¶
A simple 2-replica setup:
version: "3.9"
services:
postgres:
image: postgres:16
environment:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: myapp
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
worker:
image: myapp:latest
depends_on:
- postgres
environment:
DATABASE_URL: postgres://postgres:postgres@postgres:5432/myapp?sslmode=disable
command: ["./myapp", "run-consumers"]
restart: unless-stopped
deploy:
replicas: 2
volumes:
pgdata:
Both replicas run the same image and configuration. The leader automatically distributes consumers between them.
To scale up:
docker compose up -d --scale worker=4
Kubernetes Example¶
apiVersion: apps/v1
kind: Deployment
metadata:
name: myapp-worker
labels:
app: myapp-worker
spec:
replicas: 3
selector:
matchLabels:
app: myapp-worker
template:
metadata:
labels:
app: myapp-worker
spec:
terminationGracePeriodSeconds: 60
containers:
- name: worker
image: myapp:latest
args: ["run-consumers"]
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-credentials
key: url
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 256Mi
Health Considerations¶
The worker does not expose an HTTP endpoint by default. For Kubernetes health checks, add a lightweight HTTP server to your binary:
go func() {
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
http.ListenAndServe(":8080", nil)
}()
Then add probes to the container spec:
livenessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
readinessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
Graceful Shutdown¶
Set terminationGracePeriodSeconds high enough for in-flight batches to complete (default 30s in Kubernetes). The worker handles SIGTERM via context cancellation and drains all active processing before exiting.
Operational Checklist¶
Monitoring¶
- Consumer lag: Compare
consumer_checkpoints.last_positionwithMAX(global_position)from the event store. Growing lag means consumers can't keep up.
SELECT cp.consumer_name,
cp.last_position,
(SELECT MAX(global_position) FROM events) AS latest,
(SELECT MAX(global_position) FROM events) - cp.last_position AS lag
FROM consumer_checkpoints cp
ORDER BY lag DESC;
- Worker health: Check
worker_nodesfor stale heartbeats.
SELECT worker_id,
heartbeat_at,
NOW() - heartbeat_at AS age
FROM worker_nodes
ORDER BY heartbeat_at DESC;
- Consumer distribution: Verify consumers are evenly assigned.
SELECT ca.worker_id, COUNT(*) AS consumer_count
FROM consumer_assignments ca
GROUP BY ca.worker_id;
Configuration Checklist¶
- [ ] Database migrations applied (store + worker tables)
- [ ] Connection pool limits set (
SetMaxOpenConns, etc.) - [ ] PostgreSQL
max_connectionsaccommodates all workers - [ ] Graceful shutdown configured (SIGTERM handling)
- [ ] Logging enabled (
WithLogger) - [ ]
MaxConsecutiveFailuresset appropriately (higher for transient-error-prone consumers) - [ ] Dispatcher strategy chosen (poll vs notify)
- [ ] If using notify: dedicated connection string configured, channel name matches store config
Troubleshooting¶
Connection Exhaustion¶
Symptoms: too many connections errors, workers timing out on database calls, frequent reconnects.
Diagnose:
SELECT count(*) FROM pg_stat_activity;
SELECT setting FROM pg_settings WHERE name = 'max_connections';
Fix:
- Reduce
MaxOpenConnsper worker or increase PostgreSQLmax_connections - Use the sizing formula:
(workers × max_open_conns) + headroom - If using notify strategy, add 1 extra connection per worker to the formula
Workers Not Claiming Consumers¶
Check workers are alive:
SELECT worker_id, heartbeat_at, NOW() - heartbeat_at AS age
FROM worker_nodes
ORDER BY heartbeat_at DESC;
Check the leader exists:
SELECT pid, granted FROM pg_locks
WHERE locktype = 'advisory' AND granted = true;
Check assignments:
SELECT consumer_name, worker_id FROM consumer_assignments;
If consumer_assignments is empty, the leader hasn't run a rebalance yet. Wait up to RebalanceInterval (default 5s).
Consumer Lag Increasing¶
If lag grows despite workers being healthy:
- Increase
BatchSize— process more events per cycle - Reduce
PollInterval— poll more aggressively - Profile
Handle()functions — slow handlers are the most common bottleneck - Add workers — if lag is spread across many consumers, more workers help
- Check database performance — slow queries in handlers amplify lag
Uneven Distribution After Scaling¶
Consumer assignments converge after a rebalance cycle. If distribution looks wrong:
- Verify all workers have current heartbeats (not stale)
- Wait at least
RebalanceInterval(default 5s) after the scaling event - Check that
HeartbeatTimeouthasn't been set too low, causing false worker deaths - The assignment is deterministic round-robin — with N consumers and M workers, each worker gets ⌊N/M⌋ or ⌈N/M⌉ consumers
See Also¶
- Worker — architecture, configuration reference, and processing semantics
- Consumers — implementing Consumer and ScopedConsumer interfaces
- Observability — monitoring and logging