Worker

The Worker coordinates distributed consumer processing across multiple nodes using only PostgreSQL β€” no external message brokers, no ZooKeeper, no Redis. Deploy one or more worker instances and they automatically elect a leader, assign consumers, and process events from the global log.

Table of Contents

Installation

go get github.com/pupsourcing/worker

Requires Go 1.24+ and PostgreSQL 12+.

Quick Start

package main

import (
    "context"
    "database/sql"
    "log"
    "os/signal"
    "syscall"

    _ "github.com/lib/pq"
    "github.com/pupsourcing/store/consumer"
    storepostgres "github.com/pupsourcing/store/postgres"
    "github.com/pupsourcing/worker"
)

func main() {
    ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
    defer cancel()

    db, err := sql.Open("postgres", "postgres://localhost:5432/myapp?sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    eventStore := storepostgres.NewStore(storepostgres.DefaultStoreConfig())

    consumers := []consumer.Consumer{
        &OrderProjection{},
        &NotificationConsumer{},
    }

    w := worker.New(db, eventStore, consumers,
        worker.WithBatchSize(100),
    )

    if err := w.Start(ctx); err != nil {
        log.Fatal(err)
    }
}

Start blocks until the context is canceled or a fatal error occurs. Each worker gets a unique UUID on creation, accessible via w.ID().

Database Setup

The worker requires three tables in PostgreSQL. Generate the migration using the built-in helper:

import "github.com/pupsourcing/worker/migrations"

config := &migrations.Config{
    OutputFolder:             "./db/migrations",
    OutputFilename:           "002_worker.sql",
    WorkerNodesTable:         "worker_nodes",
    ConsumerAssignmentsTable: "consumer_assignments",
    ConsumerCheckpointsTable: "consumer_checkpoints",
}
migrations.GeneratePostgres(config)

This produces a migration file that creates the following tables:

worker_nodes

Tracks live worker instances. Each worker registers itself on startup and updates its heartbeat periodically.

Column Type Description
worker_id UUID (PK) Unique worker identifier
heartbeat_at TIMESTAMPTZ Last heartbeat timestamp
created_at TIMESTAMPTZ When the worker first registered
updated_at TIMESTAMPTZ Last update time

Includes an index on heartbeat_at for efficient stale-worker detection.

consumer_assignments

Maps each consumer to the worker responsible for processing it.

Column Type Description
consumer_name TEXT (PK) Consumer identifier (from Name())
worker_id UUID Assigned worker
created_at TIMESTAMPTZ When the assignment was created
updated_at TIMESTAMPTZ Last update time

Includes an index on worker_id for efficient per-worker lookups.

consumer_checkpoints

Stores the last processed position for each consumer in the global event log.

Column Type Description
consumer_name TEXT (PK) Consumer identifier
last_position BIGINT Last processed global position
created_at TIMESTAMPTZ When the checkpoint was created
updated_at TIMESTAMPTZ Last update time

Architecture

When a worker starts, it launches several concurrent background loops that coordinate processing across the cluster:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                   Worker Node                   β”‚
β”‚                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚  Heartbeat   β”‚  β”‚     Leader Loop          β”‚  β”‚
β”‚  β”‚  Loop        β”‚  β”‚  (pg_try_advisory_lock)  β”‚  β”‚
β”‚  β”‚  (every ~5s) β”‚  β”‚  - List live workers     β”‚  β”‚
β”‚  β”‚              β”‚  β”‚  - Rebalance consumers   β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚  Assignment  β”‚  β”‚     Dispatcher Loop      β”‚  β”‚
β”‚  β”‚  Sync Loop   β”‚  β”‚  (poll or LISTEN/NOTIFY) β”‚  β”‚
β”‚  β”‚  (every ~2s) β”‚  β”‚  - Detect new events     β”‚  β”‚
β”‚  β”‚              β”‚  β”‚  - Broadcast wakeups     β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚        Per-Consumer Processing Loops      β”‚   β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   β”‚
β”‚  β”‚  β”‚Consumer Aβ”‚ β”‚Consumer Bβ”‚ β”‚Consumer Cβ”‚  β”‚   β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Worker Registration and Heartbeating

On Start(), the worker inserts a row into worker_nodes with its UUID. A background goroutine updates heartbeat_at every HeartbeatInterval (default 5s). Workers that stop heartbeating for longer than HeartbeatTimeout (default 30s) are considered dead by the leader.

Leader Election

The worker uses PostgreSQL pg_try_advisory_lock with a well-known key for leader election:

  • Non-blocking: the first worker to acquire the lock becomes leader
  • Session-scoped: if the leader crashes, its database session dies and the lock is automatically released
  • All workers attempt to acquire the lock periodically β€” when the leader goes away, another worker takes over seamlessly

Consumer Assignment

The leader runs a rebalance cycle every RebalanceInterval (default 5s):

  1. List all live workers (heartbeat within HeartbeatTimeout)
  2. Sort consumers alphabetically by name
  3. Sort workers by UUID
  4. Assign consumers round-robin across workers
  5. Write assignments to consumer_assignments

All workers periodically sync their assignments from the database (every ~2s). When assignments change, workers start newly assigned consumers and stop unassigned ones.

Event Dispatching

The dispatcher detects new events in the global log and broadcasts wakeup signals to all consumer processing loops. Two strategies are available β€” see Dispatcher Strategies.

Per-Consumer Processing Loop

Each assigned consumer runs its own processing loop:

  1. Open a transaction
  2. Lock the assignment row (SELECT ... FOR UPDATE) to verify ownership
  3. Read the checkpoint (last processed global position)
  4. Fetch a batch from the event store (up to BatchSize events)
  5. Filter in-memory for ScopedConsumer (by AggregateTypes())
  6. Call consumer.Handle() for each event within the same transaction
  7. Save the new checkpoint (advance position)
  8. Commit the transaction β€” read model updates and checkpoint advance are atomic

Configuration

All configuration is done via functional options passed to worker.New().

Processing Options

Option Default Description
WithBatchSize(n) 100 Maximum events fetched per processing cycle
WithPollInterval(d) 1s Base wait between polls when no events are found
WithMaxPollInterval(d) 30s Upper bound for exponential backoff during idle periods
WithBatchPause(d) 200ms Pause between consecutive non-empty batches
WithBatchTimeout(d) 30s Maximum duration for processing a single batch

Coordination Options

Option Default Description
WithHeartbeatInterval(d) 5s How often the worker updates its heartbeat
WithHeartbeatTimeout(d) 30s Heartbeat age after which a worker is considered dead
WithRebalanceInterval(d) 5s How often the leader runs rebalance cycles

Dispatcher Options

Option Default Description
WithDispatcherStrategy(s) "poll" Strategy for detecting new events: "poll" or "notify"
WithDispatcherInterval(d) 200ms How often the dispatcher checks for new events (poll strategy)
WithNotifyConnectionString(s) β€” Dedicated connection string for LISTEN/NOTIFY (required for notify strategy)
WithNotifyChannel(ch) "worker_events" PostgreSQL NOTIFY channel name

Reliability Options

Option Default Description
WithMaxConsecutiveFailures(n) 5 Maximum consecutive Handle() errors before fatal shutdown

Infrastructure Options

Option Default Description
WithLogger(l) β€” Logger implementation for structured logging
WithWorkerNodesTable(name) "worker_nodes" Custom table name for worker registration
WithConsumerAssignmentsTable(name) "consumer_assignments" Custom table name for consumer assignments
WithConsumerCheckpointsTable(name) "consumer_checkpoints" Custom table name for consumer checkpoints

Example: Production Configuration

w := worker.New(db, eventStore, consumers,
    worker.WithBatchSize(200),
    worker.WithPollInterval(500*time.Millisecond),
    worker.WithMaxPollInterval(15*time.Second),
    worker.WithBatchPause(100*time.Millisecond),
    worker.WithBatchTimeout(30*time.Second),
    worker.WithHeartbeatInterval(5*time.Second),
    worker.WithHeartbeatTimeout(30*time.Second),
    worker.WithRebalanceInterval(5*time.Second),
    worker.WithMaxConsecutiveFailures(10),
    worker.WithDispatcherStrategy(worker.DispatcherStrategyNotify),
    worker.WithNotifyConnectionString(connStr),
    worker.WithNotifyChannel("app_events"),
    worker.WithLogger(logger),
)

Dispatcher Strategies

The dispatcher is responsible for detecting new events and waking up consumer processing loops. Two strategies are available.

Poll Strategy (Default)

w := worker.New(db, eventStore, consumers,
    worker.WithDispatcherStrategy(worker.DispatcherStrategyPoll),
    worker.WithDispatcherInterval(200*time.Millisecond),
)

A background goroutine queries MAX(global_position) at the configured interval. When the position advances, it broadcasts a wakeup signal to all consumer processing loops.

When to use:

  • Works with any PostgreSQL setup, including PgBouncer in transaction pooling mode
  • Simplest to operate β€” no additional connection requirements
  • Good default for most workloads

Trade-off: Latency floor is bounded by DispatcherInterval (default 200ms).

Notify Strategy

w := worker.New(db, eventStore, consumers,
    worker.WithDispatcherStrategy(worker.DispatcherStrategyNotify),
    worker.WithNotifyConnectionString("postgres://localhost:5432/myapp?sslmode=disable"),
    worker.WithNotifyChannel("app_events"),
)

Uses PostgreSQL LISTEN/NOTIFY for near-instant event detection. When the event store appends events and sends a NOTIFY, the worker receives it immediately and wakes up consumers.

The notify strategy maintains a dedicated listener connection and falls back to 1-second reconciliation polling to handle any missed notifications.

Warning

The notify strategy requires a dedicated PostgreSQL connection for LISTEN. This connection cannot go through PgBouncer in transaction pooling mode, because LISTEN is session-scoped. Use a direct connection string to PostgreSQL.

When to use:

  • You need lowest possible latency (sub-millisecond wakeup)
  • You have a direct connection to PostgreSQL (no PgBouncer, or PgBouncer in session mode)

Important: The NotifyChannel must match between the store configuration (which sends NOTIFY on event append) and the worker configuration (which LISTENs). If you configured a custom channel in the store, use the same channel name here.

Scaling

Scaling is automatic. Just deploy more worker instances β€” the leader detects new workers and rebalances consumers.

How It Works

  1. New worker starts and registers in worker_nodes
  2. Leader detects the new worker on its next rebalance cycle (~5s)
  3. Leader computes new round-robin assignment and writes to consumer_assignments
  4. All workers sync their assignments and start/stop consumers accordingly

Scaling down works the same way in reverse: when a worker stops, it stops heartbeating. The leader detects it as dead after HeartbeatTimeout and reassigns its consumers.

Example: 6 Consumers Across Workers

Consumers (sorted): Analytics, Billing, Email, Inventory, Orders, Shipping

1 worker:

Worker Consumers
worker-1 Analytics, Billing, Email, Inventory, Orders, Shipping

2 workers:

Worker Consumers
worker-1 Analytics, Email, Orders
worker-2 Billing, Inventory, Shipping

3 workers:

Worker Consumers
worker-1 Analytics, Inventory
worker-2 Billing, Orders
worker-3 Email, Shipping

6 workers:

Worker Consumers
worker-1 Analytics
worker-2 Billing
worker-3 Email
worker-4 Inventory
worker-5 Orders
worker-6 Shipping

Beyond 6 workers (with 6 consumers), additional workers remain idle as hot standbys β€” ready to take over if another worker fails.

Processing Semantics

Exactly-Once Within PostgreSQL

When a consumer writes its read model to PostgreSQL using the provided transaction, updates are exactly-once: the read model change and the checkpoint advance happen in the same transaction. If the transaction is rolled back for any reason, both the read model and the checkpoint remain unchanged.

func (p *OrderProjection) Handle(ctx context.Context, tx *sql.Tx, event store.PersistedEvent) error {
    // This write and the checkpoint update are in the same transaction
    _, err := tx.ExecContext(ctx, "INSERT INTO order_view ...")
    return err
}

At-Least-Once for External Systems

If your consumer writes to an external system (sends an email, calls an API, publishes to a message queue), the guarantee is at-least-once. If the transaction commits but the external call was already made before a crash, the event will be reprocessed on restart. Design external consumers to be idempotent.

Adaptive Polling

The per-consumer processing loop uses adaptive polling to balance latency and resource usage:

  • Base interval: PollInterval (default 1s)
  • No events found: exponential backoff up to MaxPollInterval (default 30s)
  • Events found: reset to base interval
  • Full batch: process next batch immediately (with BatchPause between batches)
  • Wakeup signal from dispatcher: reset interval and process immediately

This means the worker automatically scales its polling rate: aggressive when events are flowing, backing off when idle.

Failure Handling

When Handle() returns an error:

  1. The transaction is rolled back β€” checkpoint is not advanced
  2. The consumer retries from the same position on the next cycle
  3. A consecutive failure counter increments

If the counter reaches MaxConsecutiveFailures (default 5), the entire worker shuts down with ErrConsecutiveFailures. This is a safety mechanism β€” a consumer stuck in a failure loop should not silently fall behind.

To recover, fix the underlying issue (bad data, external service down, bug in handler) and restart the worker. Processing resumes from the last committed checkpoint.

Graceful Shutdown

The worker supports two shutdown mechanisms:

Context Cancellation

Cancel the context passed to Start():

ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer cancel()

if err := w.Start(ctx); err != nil {
    log.Fatal(err)
}

When the context is canceled, the worker:

  1. Stops all consumer processing loops (waits for in-flight batches to complete)
  2. Stops background coordination loops (heartbeat, leader, assignment, dispatcher)
  3. Deregisters from worker_nodes
  4. Returns from Start()

Explicit Stop

Call w.Stop() from another goroutine:

go func() {
    <-shutdownSignal
    w.Stop()
}()

Both approaches allow in-flight transactions to complete before the worker exits. The leader will detect the missing heartbeat and reassign the worker's consumers on the next rebalance cycle.

Troubleshooting

Workers Not Picking Up Consumers

Check that workers are registered and heartbeating:

SELECT worker_id, heartbeat_at,
       NOW() - heartbeat_at AS age
FROM worker_nodes
ORDER BY heartbeat_at DESC;

Check which worker is the leader (holds the advisory lock):

SELECT pid, granted
FROM pg_locks
WHERE locktype = 'advisory'
  AND granted = true;

Check consumer assignments:

SELECT consumer_name, worker_id
FROM consumer_assignments
ORDER BY consumer_name;

If assignments are empty, the leader may not have run a rebalance cycle yet. Wait up to RebalanceInterval (default 5s) and check again.

Consumer Lag Increasing

Compare checkpoints with the latest global position:

SELECT cp.consumer_name,
       cp.last_position,
       (SELECT MAX(global_position) FROM events) AS latest,
       (SELECT MAX(global_position) FROM events) - cp.last_position AS lag
FROM consumer_checkpoints cp
ORDER BY lag DESC;

If lag is growing:

  • Increase BatchSize to process more events per cycle
  • Reduce PollInterval to poll more aggressively
  • Profile your Handle() function β€” slow handlers are the most common bottleneck
  • Check database performance β€” slow queries in handlers amplify lag

Connection Exhaustion

Symptoms: too many connections errors, workers timing out on database calls.

Each worker needs connections for:

  • Heartbeat updates
  • Leader operations (advisory lock, rebalance queries)
  • Per-consumer processing (one transaction per consumer per batch)

With N consumers assigned to a worker, expect ~N+3 concurrent connections at peak.

Fixes:

  • Set db.SetMaxOpenConns() appropriately (see Deployment)
  • Ensure PostgreSQL max_connections accommodates all worker replicas
  • Formula: max_connections >= (workers Γ— max_open_conns) + admin_headroom

ErrConsecutiveFailures

The worker shut down because a consumer's Handle() function returned errors MaxConsecutiveFailures times in a row (default 5).

Diagnose:

  1. Check worker logs for the failing consumer name and error messages
  2. Inspect the event at the stuck position:
SELECT * FROM events
WHERE global_position = (
    SELECT last_position + 1
    FROM consumer_checkpoints
    WHERE consumer_name = 'your-consumer'
);
  1. Fix the handler bug or data issue
  2. Restart the worker β€” processing resumes from the last committed checkpoint

Notify Strategy Not Working

If using DispatcherStrategyNotify and consumers aren't waking up promptly:

  • Verify the NotifyChannel matches between store and worker configs
  • Ensure NotifyConnectionString points directly to PostgreSQL, not through PgBouncer in transaction mode
  • Check PostgreSQL logs for LISTEN/NOTIFY errors
  • The fallback reconciliation poll (every ~1s) should still work β€” if it does, the issue is with the LISTEN connection

See Also

  • Consumers β€” implementing the Consumer and ScopedConsumer interfaces
  • Deployment β€” production deployment guide
  • Observability β€” monitoring and logging