Worker¶
The Worker coordinates distributed consumer processing across multiple nodes using only PostgreSQL β no external message brokers, no ZooKeeper, no Redis. Deploy one or more worker instances and they automatically elect a leader, assign consumers, and process events from the global log.
Table of Contents¶
- Installation
- Quick Start
- Database Setup
- Architecture
- Configuration
- Dispatcher Strategies
- Scaling
- Processing Semantics
- Graceful Shutdown
- Troubleshooting
Installation¶
go get github.com/pupsourcing/worker
Requires Go 1.24+ and PostgreSQL 12+.
Quick Start¶
package main
import (
"context"
"database/sql"
"log"
"os/signal"
"syscall"
_ "github.com/lib/pq"
"github.com/pupsourcing/store/consumer"
storepostgres "github.com/pupsourcing/store/postgres"
"github.com/pupsourcing/worker"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer cancel()
db, err := sql.Open("postgres", "postgres://localhost:5432/myapp?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
eventStore := storepostgres.NewStore(storepostgres.DefaultStoreConfig())
consumers := []consumer.Consumer{
&OrderProjection{},
&NotificationConsumer{},
}
w := worker.New(db, eventStore, consumers,
worker.WithBatchSize(100),
)
if err := w.Start(ctx); err != nil {
log.Fatal(err)
}
}
Start blocks until the context is canceled or a fatal error occurs. Each worker gets a unique UUID on creation, accessible via w.ID().
Database Setup¶
The worker requires three tables in PostgreSQL. Generate the migration using the built-in helper:
import "github.com/pupsourcing/worker/migrations"
config := &migrations.Config{
OutputFolder: "./db/migrations",
OutputFilename: "002_worker.sql",
WorkerNodesTable: "worker_nodes",
ConsumerAssignmentsTable: "consumer_assignments",
ConsumerCheckpointsTable: "consumer_checkpoints",
}
migrations.GeneratePostgres(config)
This produces a migration file that creates the following tables:
worker_nodes¶
Tracks live worker instances. Each worker registers itself on startup and updates its heartbeat periodically.
| Column | Type | Description |
|---|---|---|
| worker_id | UUID (PK) | Unique worker identifier |
| heartbeat_at | TIMESTAMPTZ | Last heartbeat timestamp |
| created_at | TIMESTAMPTZ | When the worker first registered |
| updated_at | TIMESTAMPTZ | Last update time |
Includes an index on heartbeat_at for efficient stale-worker detection.
consumer_assignments¶
Maps each consumer to the worker responsible for processing it.
| Column | Type | Description |
|---|---|---|
| consumer_name | TEXT (PK) | Consumer identifier (from Name()) |
| worker_id | UUID | Assigned worker |
| created_at | TIMESTAMPTZ | When the assignment was created |
| updated_at | TIMESTAMPTZ | Last update time |
Includes an index on worker_id for efficient per-worker lookups.
consumer_checkpoints¶
Stores the last processed position for each consumer in the global event log.
| Column | Type | Description |
|---|---|---|
| consumer_name | TEXT (PK) | Consumer identifier |
| last_position | BIGINT | Last processed global position |
| created_at | TIMESTAMPTZ | When the checkpoint was created |
| updated_at | TIMESTAMPTZ | Last update time |
Architecture¶
When a worker starts, it launches several concurrent background loops that coordinate processing across the cluster:
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Worker Node β
β β
β βββββββββββββββ ββββββββββββββββββββββββββββ β
β β Heartbeat β β Leader Loop β β
β β Loop β β (pg_try_advisory_lock) β β
β β (every ~5s) β β - List live workers β β
β β β β - Rebalance consumers β β
β βββββββββββββββ ββββββββββββββββββββββββββββ β
β β
β βββββββββββββββ ββββββββββββββββββββββββββββ β
β β Assignment β β Dispatcher Loop β β
β β Sync Loop β β (poll or LISTEN/NOTIFY) β β
β β (every ~2s) β β - Detect new events β β
β β β β - Broadcast wakeups β β
β βββββββββββββββ ββββββββββββββββββββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββββββββ β
β β Per-Consumer Processing Loops β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β β βConsumer Aβ βConsumer Bβ βConsumer Cβ β β
β β ββββββββββββ ββββββββββββ ββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
Worker Registration and Heartbeating¶
On Start(), the worker inserts a row into worker_nodes with its UUID. A background goroutine updates heartbeat_at every HeartbeatInterval (default 5s). Workers that stop heartbeating for longer than HeartbeatTimeout (default 30s) are considered dead by the leader.
Leader Election¶
The worker uses PostgreSQL pg_try_advisory_lock with a well-known key for leader election:
- Non-blocking: the first worker to acquire the lock becomes leader
- Session-scoped: if the leader crashes, its database session dies and the lock is automatically released
- All workers attempt to acquire the lock periodically β when the leader goes away, another worker takes over seamlessly
Consumer Assignment¶
The leader runs a rebalance cycle every RebalanceInterval (default 5s):
- List all live workers (heartbeat within
HeartbeatTimeout) - Sort consumers alphabetically by name
- Sort workers by UUID
- Assign consumers round-robin across workers
- Write assignments to
consumer_assignments
All workers periodically sync their assignments from the database (every ~2s). When assignments change, workers start newly assigned consumers and stop unassigned ones.
Event Dispatching¶
The dispatcher detects new events in the global log and broadcasts wakeup signals to all consumer processing loops. Two strategies are available β see Dispatcher Strategies.
Per-Consumer Processing Loop¶
Each assigned consumer runs its own processing loop:
- Open a transaction
- Lock the assignment row (
SELECT ... FOR UPDATE) to verify ownership - Read the checkpoint (last processed global position)
- Fetch a batch from the event store (up to
BatchSizeevents) - Filter in-memory for
ScopedConsumer(byAggregateTypes()) - Call
consumer.Handle()for each event within the same transaction - Save the new checkpoint (advance position)
- Commit the transaction β read model updates and checkpoint advance are atomic
Configuration¶
All configuration is done via functional options passed to worker.New().
Processing Options¶
| Option | Default | Description |
|---|---|---|
WithBatchSize(n) |
100 | Maximum events fetched per processing cycle |
WithPollInterval(d) |
1s | Base wait between polls when no events are found |
WithMaxPollInterval(d) |
30s | Upper bound for exponential backoff during idle periods |
WithBatchPause(d) |
200ms | Pause between consecutive non-empty batches |
WithBatchTimeout(d) |
30s | Maximum duration for processing a single batch |
Coordination Options¶
| Option | Default | Description |
|---|---|---|
WithHeartbeatInterval(d) |
5s | How often the worker updates its heartbeat |
WithHeartbeatTimeout(d) |
30s | Heartbeat age after which a worker is considered dead |
WithRebalanceInterval(d) |
5s | How often the leader runs rebalance cycles |
Dispatcher Options¶
| Option | Default | Description |
|---|---|---|
WithDispatcherStrategy(s) |
"poll" |
Strategy for detecting new events: "poll" or "notify" |
WithDispatcherInterval(d) |
200ms | How often the dispatcher checks for new events (poll strategy) |
WithNotifyConnectionString(s) |
β | Dedicated connection string for LISTEN/NOTIFY (required for notify strategy) |
WithNotifyChannel(ch) |
"worker_events" |
PostgreSQL NOTIFY channel name |
Reliability Options¶
| Option | Default | Description |
|---|---|---|
WithMaxConsecutiveFailures(n) |
5 | Maximum consecutive Handle() errors before fatal shutdown |
Infrastructure Options¶
| Option | Default | Description |
|---|---|---|
WithLogger(l) |
β | Logger implementation for structured logging |
WithWorkerNodesTable(name) |
"worker_nodes" |
Custom table name for worker registration |
WithConsumerAssignmentsTable(name) |
"consumer_assignments" |
Custom table name for consumer assignments |
WithConsumerCheckpointsTable(name) |
"consumer_checkpoints" |
Custom table name for consumer checkpoints |
Example: Production Configuration¶
w := worker.New(db, eventStore, consumers,
worker.WithBatchSize(200),
worker.WithPollInterval(500*time.Millisecond),
worker.WithMaxPollInterval(15*time.Second),
worker.WithBatchPause(100*time.Millisecond),
worker.WithBatchTimeout(30*time.Second),
worker.WithHeartbeatInterval(5*time.Second),
worker.WithHeartbeatTimeout(30*time.Second),
worker.WithRebalanceInterval(5*time.Second),
worker.WithMaxConsecutiveFailures(10),
worker.WithDispatcherStrategy(worker.DispatcherStrategyNotify),
worker.WithNotifyConnectionString(connStr),
worker.WithNotifyChannel("app_events"),
worker.WithLogger(logger),
)
Dispatcher Strategies¶
The dispatcher is responsible for detecting new events and waking up consumer processing loops. Two strategies are available.
Poll Strategy (Default)¶
w := worker.New(db, eventStore, consumers,
worker.WithDispatcherStrategy(worker.DispatcherStrategyPoll),
worker.WithDispatcherInterval(200*time.Millisecond),
)
A background goroutine queries MAX(global_position) at the configured interval. When the position advances, it broadcasts a wakeup signal to all consumer processing loops.
When to use:
- Works with any PostgreSQL setup, including PgBouncer in transaction pooling mode
- Simplest to operate β no additional connection requirements
- Good default for most workloads
Trade-off: Latency floor is bounded by DispatcherInterval (default 200ms).
Notify Strategy¶
w := worker.New(db, eventStore, consumers,
worker.WithDispatcherStrategy(worker.DispatcherStrategyNotify),
worker.WithNotifyConnectionString("postgres://localhost:5432/myapp?sslmode=disable"),
worker.WithNotifyChannel("app_events"),
)
Uses PostgreSQL LISTEN/NOTIFY for near-instant event detection. When the event store appends events and sends a NOTIFY, the worker receives it immediately and wakes up consumers.
The notify strategy maintains a dedicated listener connection and falls back to 1-second reconciliation polling to handle any missed notifications.
Warning
The notify strategy requires a dedicated PostgreSQL connection for LISTEN. This connection cannot go through PgBouncer in transaction pooling mode, because LISTEN is session-scoped. Use a direct connection string to PostgreSQL.
When to use:
- You need lowest possible latency (sub-millisecond wakeup)
- You have a direct connection to PostgreSQL (no PgBouncer, or PgBouncer in session mode)
Important: The NotifyChannel must match between the store configuration (which sends NOTIFY on event append) and the worker configuration (which LISTENs). If you configured a custom channel in the store, use the same channel name here.
Scaling¶
Scaling is automatic. Just deploy more worker instances β the leader detects new workers and rebalances consumers.
How It Works¶
- New worker starts and registers in
worker_nodes - Leader detects the new worker on its next rebalance cycle (~5s)
- Leader computes new round-robin assignment and writes to
consumer_assignments - All workers sync their assignments and start/stop consumers accordingly
Scaling down works the same way in reverse: when a worker stops, it stops heartbeating. The leader detects it as dead after HeartbeatTimeout and reassigns its consumers.
Example: 6 Consumers Across Workers¶
Consumers (sorted): Analytics, Billing, Email, Inventory, Orders, Shipping
1 worker:
| Worker | Consumers |
|---|---|
| worker-1 | Analytics, Billing, Email, Inventory, Orders, Shipping |
2 workers:
| Worker | Consumers |
|---|---|
| worker-1 | Analytics, Email, Orders |
| worker-2 | Billing, Inventory, Shipping |
3 workers:
| Worker | Consumers |
|---|---|
| worker-1 | Analytics, Inventory |
| worker-2 | Billing, Orders |
| worker-3 | Email, Shipping |
6 workers:
| Worker | Consumers |
|---|---|
| worker-1 | Analytics |
| worker-2 | Billing |
| worker-3 | |
| worker-4 | Inventory |
| worker-5 | Orders |
| worker-6 | Shipping |
Beyond 6 workers (with 6 consumers), additional workers remain idle as hot standbys β ready to take over if another worker fails.
Processing Semantics¶
Exactly-Once Within PostgreSQL¶
When a consumer writes its read model to PostgreSQL using the provided transaction, updates are exactly-once: the read model change and the checkpoint advance happen in the same transaction. If the transaction is rolled back for any reason, both the read model and the checkpoint remain unchanged.
func (p *OrderProjection) Handle(ctx context.Context, tx *sql.Tx, event store.PersistedEvent) error {
// This write and the checkpoint update are in the same transaction
_, err := tx.ExecContext(ctx, "INSERT INTO order_view ...")
return err
}
At-Least-Once for External Systems¶
If your consumer writes to an external system (sends an email, calls an API, publishes to a message queue), the guarantee is at-least-once. If the transaction commits but the external call was already made before a crash, the event will be reprocessed on restart. Design external consumers to be idempotent.
Adaptive Polling¶
The per-consumer processing loop uses adaptive polling to balance latency and resource usage:
- Base interval:
PollInterval(default 1s) - No events found: exponential backoff up to
MaxPollInterval(default 30s) - Events found: reset to base interval
- Full batch: process next batch immediately (with
BatchPausebetween batches) - Wakeup signal from dispatcher: reset interval and process immediately
This means the worker automatically scales its polling rate: aggressive when events are flowing, backing off when idle.
Failure Handling¶
When Handle() returns an error:
- The transaction is rolled back β checkpoint is not advanced
- The consumer retries from the same position on the next cycle
- A consecutive failure counter increments
If the counter reaches MaxConsecutiveFailures (default 5), the entire worker shuts down with ErrConsecutiveFailures. This is a safety mechanism β a consumer stuck in a failure loop should not silently fall behind.
To recover, fix the underlying issue (bad data, external service down, bug in handler) and restart the worker. Processing resumes from the last committed checkpoint.
Graceful Shutdown¶
The worker supports two shutdown mechanisms:
Context Cancellation¶
Cancel the context passed to Start():
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer cancel()
if err := w.Start(ctx); err != nil {
log.Fatal(err)
}
When the context is canceled, the worker:
- Stops all consumer processing loops (waits for in-flight batches to complete)
- Stops background coordination loops (heartbeat, leader, assignment, dispatcher)
- Deregisters from
worker_nodes - Returns from
Start()
Explicit Stop¶
Call w.Stop() from another goroutine:
go func() {
<-shutdownSignal
w.Stop()
}()
Both approaches allow in-flight transactions to complete before the worker exits. The leader will detect the missing heartbeat and reassign the worker's consumers on the next rebalance cycle.
Troubleshooting¶
Workers Not Picking Up Consumers¶
Check that workers are registered and heartbeating:
SELECT worker_id, heartbeat_at,
NOW() - heartbeat_at AS age
FROM worker_nodes
ORDER BY heartbeat_at DESC;
Check which worker is the leader (holds the advisory lock):
SELECT pid, granted
FROM pg_locks
WHERE locktype = 'advisory'
AND granted = true;
Check consumer assignments:
SELECT consumer_name, worker_id
FROM consumer_assignments
ORDER BY consumer_name;
If assignments are empty, the leader may not have run a rebalance cycle yet. Wait up to RebalanceInterval (default 5s) and check again.
Consumer Lag Increasing¶
Compare checkpoints with the latest global position:
SELECT cp.consumer_name,
cp.last_position,
(SELECT MAX(global_position) FROM events) AS latest,
(SELECT MAX(global_position) FROM events) - cp.last_position AS lag
FROM consumer_checkpoints cp
ORDER BY lag DESC;
If lag is growing:
- Increase
BatchSizeto process more events per cycle - Reduce
PollIntervalto poll more aggressively - Profile your
Handle()function β slow handlers are the most common bottleneck - Check database performance β slow queries in handlers amplify lag
Connection Exhaustion¶
Symptoms: too many connections errors, workers timing out on database calls.
Each worker needs connections for:
- Heartbeat updates
- Leader operations (advisory lock, rebalance queries)
- Per-consumer processing (one transaction per consumer per batch)
With N consumers assigned to a worker, expect ~N+3 concurrent connections at peak.
Fixes:
- Set
db.SetMaxOpenConns()appropriately (see Deployment) - Ensure PostgreSQL
max_connectionsaccommodates all worker replicas - Formula:
max_connections >= (workers Γ max_open_conns) + admin_headroom
ErrConsecutiveFailures¶
The worker shut down because a consumer's Handle() function returned errors MaxConsecutiveFailures times in a row (default 5).
Diagnose:
- Check worker logs for the failing consumer name and error messages
- Inspect the event at the stuck position:
SELECT * FROM events
WHERE global_position = (
SELECT last_position + 1
FROM consumer_checkpoints
WHERE consumer_name = 'your-consumer'
);
- Fix the handler bug or data issue
- Restart the worker β processing resumes from the last committed checkpoint
Notify Strategy Not Working¶
If using DispatcherStrategyNotify and consumers aren't waking up promptly:
- Verify the
NotifyChannelmatches between store and worker configs - Ensure
NotifyConnectionStringpoints directly to PostgreSQL, not through PgBouncer in transaction mode - Check PostgreSQL logs for LISTEN/NOTIFY errors
- The fallback reconciliation poll (every ~1s) should still work β if it does, the issue is with the LISTEN connection
See Also¶
- Consumers β implementing the Consumer and ScopedConsumer interfaces
- Deployment β production deployment guide
- Observability β monitoring and logging