Consumers¶
Guide to building and running consumers in pupsourcing.
Table of Contents¶
- Consumers Overview
- Consumer Interface
- Projections as Scoped Consumers
- Running Consumers with Worker
- Worker Configuration
- Wakeup Sources
- One-Off Consumer Processing
- Integration Testing with RunModeOneOff
- Consumer Guarantees
Consumers Overview¶
A consumer processes persisted events and advances a checkpoint.
A projection is a specific type of consumer that builds query-optimized read models.
Events -> Consumer Handle() -> Side effects (SQL read model, broker publish, cache, etc.)
Use consumers for:
- Read models and query tables
- Outbox/integration publishing
- Audit pipelines
- Derived metrics and materialized views
Consumer Interface¶
Consumers implement consumer.Consumer:
package myapp
import (
"context"
"database/sql"
"github.com/pupsourcing/core/es"
)
type UserProjection struct{}
func (p *UserProjection) Name() string {
return "user_projection"
}
func (p *UserProjection) Handle(ctx context.Context, tx *sql.Tx, event es.PersistedEvent) error {
// Use tx for SQL read models stored in the same database.
// The processor commits/rolls back tx.
_, err := tx.ExecContext(ctx,
"INSERT INTO user_events (event_id, event_type) VALUES ($1, $2) ON CONFLICT (event_id) DO NOTHING",
event.EventID,
event.EventType,
)
return err
}
Projections as Scoped Consumers¶
For read models, you typically scope consumers by aggregate type and bounded context.
package myapp
import (
"context"
"database/sql"
"github.com/pupsourcing/core/es"
)
type UserReadModel struct{}
func (p *UserReadModel) Name() string { return "user_read_model" }
func (p *UserReadModel) AggregateTypes() []string {
return []string{"User"}
}
func (p *UserReadModel) BoundedContexts() []string {
return []string{"Identity"}
}
func (p *UserReadModel) Handle(ctx context.Context, tx *sql.Tx, event es.PersistedEvent) error {
_, err := tx.ExecContext(ctx,
"INSERT INTO users_view (user_id, event_type, created_at) VALUES ($1, $2, $3)",
event.AggregateID,
event.EventType,
event.CreatedAt,
)
return err
}
Running Consumers with Worker¶
Use the Worker API for continuous consumer processing.
package main
import (
"context"
"database/sql"
"github.com/pupsourcing/core/es/adapters/postgres"
)
func run(ctx context.Context, db *sql.DB) error {
store := postgres.NewStore(postgres.DefaultStoreConfig())
w := postgres.NewWorker(db, store)
return w.Run(ctx,
&UserReadModel{},
&OrderProjection{},
&BillingOutboxConsumer{},
)
}
Each worker instance automatically claims a fair share of segments per consumer.
Worker Configuration¶
Customize worker behavior with options:
package main
import (
"time"
"github.com/pupsourcing/core/es/adapters/postgres"
"github.com/pupsourcing/core/es/worker"
)
w := postgres.NewWorker(db, store,
worker.WithTotalSegments(4), // Start conservative; increase only when needed.
worker.WithBatchSize(200), // Processes up to 200 events per fetch.
worker.WithPollInterval(500*time.Millisecond), // Base polling interval between event checks.
worker.WithMaxPollInterval(30*time.Second), // Backs off polling to reduce idle DB load.
worker.WithMaxPostBatchPause(100*time.Millisecond), // Caps adaptive post-batch pause under sustained load.
worker.WithHeartbeatInterval(5*time.Second), // Reports liveness while owning segments.
worker.WithStaleThreshold(30*time.Second), // Lets others reclaim segments from dead workers.
worker.WithRebalanceInterval(10*time.Second), // Rebalances segment ownership across workers.
)
err := w.Run(ctx, &UserReadModel{}, &OrderProjection{})
TotalSegments defines the parallelism ceiling per consumer. Start with 4 and increase only when you have clear evidence of bottlenecks. See Deployment — Choosing TotalSegments for detailed guidance.
MaxPostBatchPause controls adaptive throttling after successful non-empty batches in continuous mode. The pause grows under sustained full batches, shrinks/resets as load cools, and is capped by this value (default 100ms). Set it to <= 0 to disable adaptive post-batch pause.
Wakeup Sources¶
A wakeup source tells segment processors when new events are available. Without one, segments rely purely on polling — checking the database at regular intervals. With a wakeup source, segments sleep until a signal arrives, then immediately check for work.
pupsourcing ships with two wakeup source implementations:
Built-in Polling Dispatcher (default)¶
The Worker includes a built-in Dispatcher that periodically reads the latest global position from the events table and broadcasts a wake signal to all segment processors when it advances.
- Pros: Zero configuration, works everywhere
- Cons: Adds periodic read queries even when no events are flowing. At idle, the floor is
total_owned_segments / MaxPollIntervalqueries per second per worker.
This is enabled by default. You can disable it with worker.WithEnableDispatcher(false).
NotifyDispatcher (LISTEN/NOTIFY)¶
The NotifyDispatcher uses PostgreSQL's native LISTEN/NOTIFY mechanism for instant event notifications. When an event is appended, pg_notify fires inside the same transaction — so the notification arrives only when the transaction commits, guaranteeing no phantom wakes.
- Pros: Zero idle database load, instant wake on event append (sub-millisecond latency)
- Cons: Requires a dedicated PostgreSQL connection (managed by
pq.NewListener, not from your connection pool)
Setup requires two pieces:
-
Store — configure the NOTIFY channel so
Append()firespg_notify:go store := postgres.NewStore(postgres.NewStoreConfig( postgres.WithNotifyChannel("pupsourcing_events"), )) -
Worker — create a
NotifyDispatcherand pass it as the wakeup source:```go import "github.com/pupsourcing/core/es/worker"
// connStr is a Postgres connection string (not *sql.DB) nd := postgres.NewNotifyDispatcher(connStr, &postgres.NotifyDispatcherConfig{ Channel: "pupsourcing_events", // Must match WithNotifyChannel FallbackInterval: 30 * time.Second, // Safety net for missed notifications })
w := postgres.NewWorker(db, store, worker.WithWakeupSource(nd), worker.WithTotalSegments(4), )
err := w.Run(ctx, &UserReadModel{}, &OrderProjection{}) ```
How it works:
store.Append()executesSELECT pg_notify('pupsourcing_events', '<position>')inside the caller's transaction- When the transaction commits, PostgreSQL delivers the notification to all
LISTENconnections NotifyDispatcherreceives the notification viapq.NewListenerand broadcasts a wake signal to all subscribed segment processors- Each segment processor immediately reads new events from the store
Fallback timer: The NotifyDispatcher includes a configurable fallback timer (default 30s). If no notification arrives within this window — due to a connection drop, network partition, or any other transient issue — it sends a wake signal anyway. This guarantees eventual progress even if LISTEN/NOTIFY is temporarily disrupted.
NotifyDispatcher configuration options:
| Option | Default | Purpose |
|---|---|---|
Channel |
"pupsourcing_events" |
Postgres NOTIFY channel name |
FallbackInterval |
30s |
Max time without notification before fallback wake |
MinReconnectInterval |
5s |
Min delay before reconnecting after listener error |
MaxReconnectInterval |
60s |
Max delay before reconnecting after repeated errors |
Logger |
nil |
Optional logger for observability |
One-Off Consumer Processing¶
Use BasicProcessor with RunModeOneOff when you want to process available events and exit.
package main
import (
"github.com/pupsourcing/core/es/consumer"
"github.com/pupsourcing/core/es/adapters/postgres"
)
store := postgres.NewStore(postgres.DefaultStoreConfig())
cfg := consumer.DefaultBasicProcessorConfig()
cfg.RunMode = consumer.RunModeOneOff
processor := postgres.NewBasicProcessor(db, store, cfg)
err := processor.Run(ctx, &UserReadModel{})
if err != nil {
return err
}
This mode is useful for:
- test runs
- catch-up jobs
- backfills
- controlled replay jobs
Integration Testing with RunModeOneOff¶
RunModeOneOff makes integration tests deterministic.
func TestUserProjection_OneOff(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
ctx := context.Background()
store := postgres.NewStore(postgres.DefaultStoreConfig())
// Arrange: append events
tx, _ := db.BeginTx(ctx, nil)
_, err := store.Append(ctx, tx, es.NoStream(), []es.Event{
{
BoundedContext: "Identity",
AggregateType: "User",
AggregateID: "user-1",
EventID: uuid.New(),
EventType: "UserCreated",
EventVersion: 1,
Payload: []byte(`{"email":"alice@example.com"}`),
Metadata: []byte(`{}`),
CreatedAt: time.Now(),
},
})
if err != nil {
t.Fatal(err)
}
if err := tx.Commit(); err != nil {
t.Fatal(err)
}
// Act: process and exit
cfg := consumer.DefaultBasicProcessorConfig()
cfg.RunMode = consumer.RunModeOneOff
p := postgres.NewBasicProcessor(db, store, cfg)
if err := p.Run(ctx, &UserReadModel{}); err != nil {
t.Fatalf("processor failed: %v", err)
}
// Assert read model/checkpoint state
assertUserExists(t, db, "user-1")
}
Consumer Guarantees¶
- At-least-once delivery semantics
- Ordered processing per segment
- Atomic checkpoint updates with consumer transaction
- Automatic resume from checkpoints after restart
- Horizontal scaling with Worker on PostgreSQL