Deployment & Operations Guide

This guide covers running pupsourcing workers in production — from a minimal setup to Kubernetes deployments.

Table of Contents

Prerequisites

Before deploying:

  1. Database migrations applied — both the event store tables and the worker tables
  2. Consumers implemented — at least one type satisfying consumer.Consumer or consumer.ScopedConsumer
  3. Worker configuration decided — dispatcher strategy, batch size, and connection pool limits
  4. PostgreSQL 12+ accessible from all worker instances

Minimal Production Setup

A complete main() for a production worker process:

package main

import (
    "context"
    "database/sql"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    _ "github.com/lib/pq"
    "github.com/pupsourcing/store/consumer"
    storepostgres "github.com/pupsourcing/store/postgres"
    "github.com/pupsourcing/worker"
)

func main() {
    connStr := os.Getenv("DATABASE_URL")
    if connStr == "" {
        log.Fatal("DATABASE_URL is required")
    }

    db, err := sql.Open("postgres", connStr)
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    // Connection pool settings
    db.SetMaxOpenConns(15)
    db.SetMaxIdleConns(5)
    db.SetConnMaxLifetime(30 * time.Minute)
    db.SetConnMaxIdleTime(5 * time.Minute)

    // Event store
    eventStore := storepostgres.NewStore(storepostgres.DefaultStoreConfig())

    // Register consumers
    consumers := []consumer.Consumer{
        &OrderProjection{},
        &InventoryProjection{},
        &NotificationConsumer{},
    }

    // Create worker with production options
    w := worker.New(db, eventStore, consumers,
        worker.WithBatchSize(200),
        worker.WithPollInterval(500*time.Millisecond),
        worker.WithMaxPollInterval(15*time.Second),
        worker.WithBatchPause(100*time.Millisecond),
        worker.WithHeartbeatInterval(5*time.Second),
        worker.WithHeartbeatTimeout(30*time.Second),
        worker.WithMaxConsecutiveFailures(10),
    )

    // Graceful shutdown on SIGTERM/SIGINT
    ctx, cancel := signal.NotifyContext(context.Background(),
        syscall.SIGTERM, syscall.SIGINT)
    defer cancel()

    log.Printf("worker %s starting", w.ID())
    if err := w.Start(ctx); err != nil {
        log.Fatalf("worker exited with error: %v", err)
    }
    log.Println("worker stopped gracefully")
}

Run the same binary across multiple instances for horizontal scaling. No instance-specific configuration is needed — each worker self-registers and the leader assigns consumers automatically.

Database Connection Pooling

Properly sized connection pools prevent exhaustion without wasting resources.

db.SetMaxOpenConns(15)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(30 * time.Minute)
db.SetConnMaxIdleTime(5 * time.Minute)

Why These Values

  • MaxOpenConns(15): Each worker runs concurrent operations — heartbeat updates, leader operations, and one processing transaction per assigned consumer. 15 connections provides headroom without exhausting the server. With 4 worker replicas × 15 = 60 connections, which fits within PostgreSQL's default max_connections=100.

  • MaxIdleConns(5): Keeps a warm pool of ready connections for burst traffic without hoarding connections during quiet periods.

  • ConnMaxLifetime(30min): Forces periodic connection rotation. This helps with DNS-based failover, load balancer draining, and prevents issues with long-lived connections.

  • ConnMaxIdleTime(5min): Reclaims connections idle for 5 minutes, preventing stale connections from accumulating.

Sizing Formula

max_connections >= (worker_replicas × max_open_conns) + admin_headroom

For example, with 4 workers and MaxOpenConns(15):

max_connections >= (4 × 15) + 10 = 70

Warning

If using the notify dispatcher strategy, each worker opens one additional dedicated connection for LISTEN that is not drawn from the database/sql pool. Factor this into your max_connections calculation.

Scaling

Horizontal Scaling

Scaling is automatic. Deploy more worker instances and the leader rebalances consumers:

  1. New worker registers in worker_nodes
  2. Leader detects the new worker on its next rebalance cycle (~5s)
  3. Leader computes new round-robin assignment
  4. Workers sync assignments and start/stop consumers

Scaling down is equally automatic: when a worker stops, the leader detects the missing heartbeat after HeartbeatTimeout (default 30s) and reassigns its consumers.

Distribution Table

With 6 consumers (Analytics, Billing, Email, Inventory, Orders, Shipping):

Workers Distribution
1 All 6 on worker-1
2 3 + 3 (round-robin)
3 2 + 2 + 2
4 2 + 2 + 1 + 1
6 1 per worker
7+ 6 active, extras are hot standbys

Scaling Guidelines

  • Start with 1 worker and scale based on observed consumer lag
  • Each consumer runs on exactly one worker at a time — no duplicate processing
  • More workers than consumers means some workers sit idle as standbys
  • All instances run identical binaries with identical configuration

Dispatcher Configuration

Choose a dispatcher strategy based on your infrastructure.

Poll Strategy (Default)

w := worker.New(db, eventStore, consumers,
    worker.WithDispatcherStrategy(worker.DispatcherStrategyPoll),
    worker.WithDispatcherInterval(200*time.Millisecond),
)
  • Checks MAX(global_position) every 200ms
  • Works everywhere — any PostgreSQL setup, including PgBouncer in transaction pooling mode
  • Latency floor: ~200ms (configurable via DispatcherInterval)

Notify Strategy (Low Latency)

w := worker.New(db, eventStore, consumers,
    worker.WithDispatcherStrategy(worker.DispatcherStrategyNotify),
    worker.WithNotifyConnectionString(connStr),
    worker.WithNotifyChannel("app_events"),
)
  • Uses PostgreSQL LISTEN/NOTIFY for sub-millisecond wakeup
  • Falls back to 1-second reconciliation polling
  • Requires a dedicated connection to PostgreSQL (not through PgBouncer in transaction mode)

Warning

Two things must match for notify to work:

  1. The store must be configured to NOTIFY on the same channel when appending events
  2. WithNotifyConnectionString must point directly to PostgreSQL — LISTEN is session-scoped and incompatible with PgBouncer transaction pooling

Decision Guide

Criteria Poll Notify
PgBouncer compatible ✅ Yes ❌ Transaction mode breaks it
Latency ~200ms Sub-millisecond
Extra connections None 1 per worker
Complexity Simple Slightly more setup
Recommended for Most deployments Latency-sensitive workloads

Docker Compose Example

A simple 2-replica setup:

version: "3.9"

services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: myapp
    ports:
      - "5432:5432"
    volumes:
      - pgdata:/var/lib/postgresql/data

  worker:
    image: myapp:latest
    depends_on:
      - postgres
    environment:
      DATABASE_URL: postgres://postgres:postgres@postgres:5432/myapp?sslmode=disable
    command: ["./myapp", "run-consumers"]
    restart: unless-stopped
    deploy:
      replicas: 2

volumes:
  pgdata:

Both replicas run the same image and configuration. The leader automatically distributes consumers between them.

To scale up:

docker compose up -d --scale worker=4

Kubernetes Example

apiVersion: apps/v1
kind: Deployment
metadata:
  name: myapp-worker
  labels:
    app: myapp-worker
spec:
  replicas: 3
  selector:
    matchLabels:
      app: myapp-worker
  template:
    metadata:
      labels:
        app: myapp-worker
    spec:
      terminationGracePeriodSeconds: 60
      containers:
        - name: worker
          image: myapp:latest
          args: ["run-consumers"]
          env:
            - name: DATABASE_URL
              valueFrom:
                secretKeyRef:
                  name: db-credentials
                  key: url
          resources:
            requests:
              cpu: 100m
              memory: 128Mi
            limits:
              cpu: 500m
              memory: 256Mi

Health Considerations

The worker does not expose an HTTP endpoint by default. For Kubernetes health checks, add a lightweight HTTP server to your binary:

go func() {
    http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
        w.WriteHeader(http.StatusOK)
    })
    http.ListenAndServe(":8080", nil)
}()

Then add probes to the container spec:

livenessProbe:
  httpGet:
    path: /healthz
    port: 8080
  initialDelaySeconds: 5
  periodSeconds: 10
readinessProbe:
  httpGet:
    path: /healthz
    port: 8080
  initialDelaySeconds: 5
  periodSeconds: 10

Graceful Shutdown

Set terminationGracePeriodSeconds high enough for in-flight batches to complete (default 30s in Kubernetes). The worker handles SIGTERM via context cancellation and drains all active processing before exiting.

Operational Checklist

Monitoring

  • Consumer lag: Compare consumer_checkpoints.last_position with MAX(global_position) from the event store. Growing lag means consumers can't keep up.
SELECT cp.consumer_name,
       cp.last_position,
       (SELECT MAX(global_position) FROM events) AS latest,
       (SELECT MAX(global_position) FROM events) - cp.last_position AS lag
FROM consumer_checkpoints cp
ORDER BY lag DESC;
  • Worker health: Check worker_nodes for stale heartbeats.
SELECT worker_id,
       heartbeat_at,
       NOW() - heartbeat_at AS age
FROM worker_nodes
ORDER BY heartbeat_at DESC;
  • Consumer distribution: Verify consumers are evenly assigned.
SELECT ca.worker_id, COUNT(*) AS consumer_count
FROM consumer_assignments ca
GROUP BY ca.worker_id;

Configuration Checklist

  • [ ] Database migrations applied (store + worker tables)
  • [ ] Connection pool limits set (SetMaxOpenConns, etc.)
  • [ ] PostgreSQL max_connections accommodates all workers
  • [ ] Graceful shutdown configured (SIGTERM handling)
  • [ ] Logging enabled (WithLogger)
  • [ ] MaxConsecutiveFailures set appropriately (higher for transient-error-prone consumers)
  • [ ] Dispatcher strategy chosen (poll vs notify)
  • [ ] If using notify: dedicated connection string configured, channel name matches store config

Troubleshooting

Connection Exhaustion

Symptoms: too many connections errors, workers timing out on database calls, frequent reconnects.

Diagnose:

SELECT count(*) FROM pg_stat_activity;
SELECT setting FROM pg_settings WHERE name = 'max_connections';

Fix:

  • Reduce MaxOpenConns per worker or increase PostgreSQL max_connections
  • Use the sizing formula: (workers × max_open_conns) + headroom
  • If using notify strategy, add 1 extra connection per worker to the formula

Workers Not Claiming Consumers

Check workers are alive:

SELECT worker_id, heartbeat_at, NOW() - heartbeat_at AS age
FROM worker_nodes
ORDER BY heartbeat_at DESC;

Check the leader exists:

SELECT pid, granted FROM pg_locks
WHERE locktype = 'advisory' AND granted = true;

Check assignments:

SELECT consumer_name, worker_id FROM consumer_assignments;

If consumer_assignments is empty, the leader hasn't run a rebalance yet. Wait up to RebalanceInterval (default 5s).

Consumer Lag Increasing

If lag grows despite workers being healthy:

  1. Increase BatchSize — process more events per cycle
  2. Reduce PollInterval — poll more aggressively
  3. Profile Handle() functions — slow handlers are the most common bottleneck
  4. Add workers — if lag is spread across many consumers, more workers help
  5. Check database performance — slow queries in handlers amplify lag

Uneven Distribution After Scaling

Consumer assignments converge after a rebalance cycle. If distribution looks wrong:

  1. Verify all workers have current heartbeats (not stale)
  2. Wait at least RebalanceInterval (default 5s) after the scaling event
  3. Check that HeartbeatTimeout hasn't been set too low, causing false worker deaths
  4. The assignment is deterministic round-robin — with N consumers and M workers, each worker gets ⌊N/M⌋ or ⌈N/M⌉ consumers

See Also

  • Worker — architecture, configuration reference, and processing semantics
  • Consumers — implementing Consumer and ScopedConsumer interfaces
  • Observability — monitoring and logging