Event Store¶
The event store is the persistence backbone of a pupsourcing application. It provides
an append-only log of domain events backed by PostgreSQL, with built-in optimistic
concurrency control, global ordering, and optional real-time notifications via
LISTEN/NOTIFY.
The github.com/pupsourcing/store package defines the core types and interfaces.
The store/postgres sub-package provides the production PostgreSQL implementation.
Installation¶
go get github.com/pupsourcing/store
Requires Go 1.24+ and PostgreSQL 12+.
Database Setup¶
Before using the store you need to create two tables: events and aggregate_heads. The library ships a migration generator so you can produce a plain SQL file that fits your migration tool of choice.
Migration Generation ā Go API¶
package main
import "github.com/pupsourcing/store/migrations"
func main() {
config := &migrations.Config{
OutputFolder: "./db/migrations",
OutputFilename: "001_event_store.sql",
EventsTable: "events", // default
AggregateHeadsTable: "aggregate_heads", // default
}
migrations.GeneratePostgres(config)
}
Migration Generation ā CLI¶
go run github.com/pupsourcing/store/cmd/migrate-gen \
-output migrations \
-filename 001_event_store.sql
Generated Schema¶
events table¶
The events table is the append-only log. Every persisted event gets a gap-free,
monotonically increasing global_position via BIGSERIAL.
| Column | Type | Notes |
|---|---|---|
global_position |
BIGSERIAL |
Primary key ā global ordering |
aggregate_type |
TEXT NOT NULL |
e.g. "User", "Order" |
aggregate_id |
TEXT NOT NULL |
Aggregate instance identifier |
aggregate_version |
BIGINT NOT NULL |
Per-aggregate sequence number |
event_id |
UUID NOT NULL |
Unique, client-generated |
event_type |
TEXT NOT NULL |
e.g. "UserRegistered" |
event_version |
INT DEFAULT 1 |
Schema version of the payload |
payload |
BYTEA |
Serialized event data |
trace_id |
TEXT |
Distributed tracing |
correlation_id |
TEXT |
Correlation chain |
causation_id |
TEXT |
Direct cause |
metadata |
JSONB |
Arbitrary key-value metadata |
created_at |
TIMESTAMPTZ |
Event timestamp |
Constraints:
PRIMARY KEY (global_position)UNIQUE (event_id)UNIQUE (aggregate_type, aggregate_id, aggregate_version)ā database-level safety net for optimistic concurrency
Indexes:
idx_events_aggregateā(aggregate_type, aggregate_id)idx_events_event_typeā(event_type)idx_events_correlationā(correlation_id)
aggregate_heads table¶
The aggregate heads table tracks the latest version of each aggregate for O(1) version lookups during concurrency checks.
| Column | Type | Notes |
|---|---|---|
aggregate_type |
TEXT NOT NULL |
Composite primary key (part 1) |
aggregate_id |
TEXT NOT NULL |
Composite primary key (part 2) |
aggregate_version |
BIGINT NOT NULL |
Current head version |
updated_at |
TIMESTAMPTZ |
Last update timestamp |
Indexes:
idx_aggregate_heads_updatedā(updated_at)
Store Configuration¶
Use StoreConfig to customise table names, logging, and notifications.
Default Configuration¶
config := postgres.DefaultStoreConfig()
// events table: "events"
// aggregate_heads table: "aggregate_heads"
// notify channel: "" (disabled)
// logger: nil
Custom Configuration with Options¶
config := postgres.NewStoreConfig(
postgres.WithEventsTable("my_events"),
postgres.WithAggregateHeadsTable("my_aggregate_heads"),
postgres.WithNotifyChannel("app_events"),
postgres.WithLogger(myLogger),
)
Available Options¶
| Option | Default | Description |
|---|---|---|
WithEventsTable(name) |
"events" |
Name of the events table |
WithAggregateHeadsTable(n) |
"aggregate_heads" |
Name of the aggregate heads table |
WithNotifyChannel(ch) |
"" (disabled) |
PostgreSQL NOTIFY channel for real-time wakeups |
WithLogger(l) |
nil |
Logger implementing store.Logger |
Creating the Store¶
s := postgres.NewStore(config)
The returned *postgres.Store implements all four store interfaces:
EventStore, EventReader, GlobalPositionReader, and AggregateStreamReader.
Core Types¶
Event¶
An Event is the pre-persistence representation. You build these and pass them to
Append().
type Event struct {
CreatedAt time.Time
AggregateType string
EventType string
AggregateID string
Payload []byte
Metadata []byte
CausationID store.NullString
CorrelationID store.NullString
TraceID store.NullString
EventVersion int
EventID uuid.UUID
}
PersistedEvent¶
A PersistedEvent is what the store returns after persistence. It adds
GlobalPosition and AggregateVersion ā both assigned by PostgreSQL.
type PersistedEvent struct {
CreatedAt time.Time
AggregateType string
EventType string
AggregateID string
Payload []byte
Metadata []byte
CausationID store.NullString
CorrelationID store.NullString
TraceID store.NullString
GlobalPosition int64
AggregateVersion int64
EventVersion int
EventID uuid.UUID
}
GlobalPositionā monotonically increasing sequence across all aggregates.AggregateVersionā per-aggregate sequence starting at 1.
Stream¶
A Stream represents the event history of a single aggregate instance.
type Stream struct {
AggregateType string
AggregateID string
Events []PersistedEvent
}
| Method | Description |
|---|---|
Version() |
Returns the version of the last event, or 0 |
IsEmpty() |
Returns true when the stream has no events |
Len() |
Returns the number of events in the stream |
AppendResult¶
Returned by Append() on success.
type AppendResult struct {
Events []PersistedEvent
GlobalPositions []int64
}
| Method | Description |
|---|---|
FromVersion() |
Aggregate version of the first appended event |
ToVersion() |
Aggregate version of the last appended event |
ExpectedVersion¶
Controls optimistic concurrency when appending. Three constructors:
store.Any() // Skip concurrency check
store.NoStream() // Expect the aggregate does not exist yet (version 0)
store.Exact(version) // Expect the aggregate is at this exact version
| Method | Description |
|---|---|
IsAny() |
True if the check is disabled |
IsNoStream() |
True if expecting a new aggregate |
IsExact() |
True if expecting a specific version |
Value() |
Returns the expected version as int64 |
String() |
Human-readable representation |
NullString¶
A nullable string helper, used for optional tracing fields.
type NullString struct {
String string
Valid bool
}
Set Valid: true and populate String when the value is present. When Valid is
false the field is stored as SQL NULL.
Store Interfaces¶
The store package defines four focused interfaces. The PostgreSQL implementation satisfies all of them.
EventStore ā Appending Events¶
type EventStore interface {
Append(
ctx context.Context,
tx *sql.Tx,
expectedVersion ExpectedVersion,
events []Event,
) (AppendResult, error)
}
Appends one or more events to a single aggregate stream inside the caller's
transaction. The aggregate identity is derived from the events themselves
(AggregateType + AggregateID).
EventReader ā Reading the Global Log¶
type EventReader interface {
ReadEvents(
ctx context.Context,
tx *sql.Tx,
fromPosition int64,
limit int,
) ([]PersistedEvent, error)
}
Reads events from the global log starting after fromPosition, ordered by
global_position. Pass 0 to start from the beginning.
GlobalPositionReader ā Position Check¶
type GlobalPositionReader interface {
GetLatestGlobalPosition(ctx context.Context, tx *sql.Tx) (int64, error)
}
Returns the highest global_position in the events table. Useful for lightweight
polling and health checks.
AggregateStreamReader ā Aggregate History¶
type AggregateStreamReader interface {
ReadAggregateStream(
ctx context.Context,
tx *sql.Tx,
aggregateType, aggregateID string,
fromVersion, toVersion *int64,
) (Stream, error)
}
Reads events for a specific aggregate instance, optionally filtered by a version
range. Pass nil for either bound to leave it open.
Appending Events¶
Creating a New Aggregate¶
Use store.NoStream() to assert the aggregate doesn't exist yet:
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
result, err := s.Append(ctx, tx, store.NoStream(), []store.Event{
{
AggregateType: "User",
AggregateID: "user-123",
EventID: uuid.New(),
EventType: "UserRegistered",
EventVersion: 1,
Payload: []byte(`{"email":"alice@example.com","name":"Alice"}`),
Metadata: []byte(`{}`),
CreatedAt: time.Now(),
},
})
if err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
fmt.Printf("Created at version %d (global pos %d)\n",
result.ToVersion(), result.GlobalPositions[0])
Updating an Existing Aggregate¶
Use store.Exact(version) to guard against concurrent writes:
// First, load the current stream to get the version.
stream, err := s.ReadAggregateStream(ctx, tx, "User", "user-123", nil, nil)
if err != nil {
return err
}
result, err := s.Append(ctx, tx, store.Exact(stream.Version()), []store.Event{
{
AggregateType: "User",
AggregateID: "user-123",
EventID: uuid.New(),
EventType: "UserEmailChanged",
EventVersion: 1,
Payload: []byte(`{"email":"alice-new@example.com"}`),
Metadata: []byte(`{}`),
CreatedAt: time.Now(),
},
})
if err != nil {
if errors.Is(err, store.ErrOptimisticConcurrency) {
// Another write happened ā retry or return conflict
}
return err
}
Batch Appending¶
Multiple events can be appended atomically to the same aggregate:
result, err := s.Append(ctx, tx, store.NoStream(), []store.Event{
{
AggregateType: "Order",
AggregateID: "order-456",
EventID: uuid.New(),
EventType: "OrderCreated",
EventVersion: 1,
Payload: []byte(`{"customer_id":"cust-1"}`),
Metadata: []byte(`{}`),
CreatedAt: time.Now(),
},
{
AggregateType: "Order",
AggregateID: "order-456",
EventID: uuid.New(),
EventType: "OrderItemAdded",
EventVersion: 1,
Payload: []byte(`{"sku":"WIDGET-1","qty":2}`),
Metadata: []byte(`{}`),
CreatedAt: time.Now(),
},
})
if err != nil {
return err
}
fmt.Printf("Appended versions %dā%d\n", result.FromVersion(), result.ToVersion())
// Output: Appended versions 1ā2
Transaction Management¶
Append() operates inside your transaction. You control when to commit:
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// Multiple operations in the same transaction
_, err = s.Append(ctx, tx, store.NoStream(), userEvents)
if err != nil {
return err
}
// e.g. write to a saga table in the same tx
_, err = tx.ExecContext(ctx, "INSERT INTO sagas ...")
if err != nil {
return err
}
return tx.Commit()
Warning
The tx parameter must be an active transaction. Never pass nil. The store
does not create or manage transactions ā that is the caller's responsibility.
Reading Events¶
Reading an Aggregate Stream¶
Load the full history of a single aggregate:
stream, err := s.ReadAggregateStream(ctx, tx, "User", "user-123", nil, nil)
if err != nil {
return err
}
if stream.IsEmpty() {
fmt.Println("Aggregate not found")
return nil
}
fmt.Printf("User user-123 at version %d (%d events)\n",
stream.Version(), stream.Len())
for _, e := range stream.Events {
fmt.Printf(" v%d: %s\n", e.AggregateVersion, e.EventType)
}
Load a specific version range:
from := int64(3)
to := int64(7)
stream, err := s.ReadAggregateStream(ctx, tx, "Order", "order-456", &from, &to)
Reading from the Global Log¶
Sequential reading is the foundation for consumers and projections:
var position int64 = 0
for {
events, err := s.ReadEvents(ctx, tx, position, 100)
if err != nil {
return err
}
if len(events) == 0 {
break // caught up
}
for _, e := range events {
fmt.Printf("[%d] %s/%s v%d ā %s\n",
e.GlobalPosition, e.AggregateType, e.AggregateID,
e.AggregateVersion, e.EventType)
position = e.GlobalPosition
}
}
fromPosition is exclusive ā events with global_position > fromPosition are
returned. Pass 0 to read from the very first event.
Filtered Reading with Scope¶
The PostgreSQL store adds ReadEventsWithScope to filter events by aggregate type
at the database level:
scope := postgres.ReadEventsScope{
AggregateTypes: []string{"User", "Account"},
}
events, err := s.ReadEventsWithScope(ctx, tx, position, 100, scope)
This pushes the filter to the SQL query, avoiding the need to skip irrelevant events in application code.
Getting the Latest Global Position¶
A lightweight check to see how far the log extends:
pos, err := s.GetLatestGlobalPosition(ctx, tx)
if err != nil {
return err
}
fmt.Printf("Event store has events up to position %d\n", pos)
Optimistic Concurrency¶
Pupsourcing uses optimistic concurrency control to prevent conflicting writes to the same aggregate.
How It Works¶
-
Application-level check ā Before appending, the store reads the current version from
aggregate_heads(an O(1) lookup by primary key) and compares it againstExpectedVersion. -
Database-level safety net ā The
UNIQUE (aggregate_type, aggregate_id, aggregate_version)constraint on the events table guarantees that even in the presence of a race condition, duplicate versions cannot be inserted. -
Aggregate heads update ā On successful append, the store upserts
aggregate_headsto reflect the new version.
Version Strategies¶
| Strategy | Use Case |
|---|---|
store.NoStream() |
Creating a new aggregate ā expects version 0 |
store.Exact(version) |
Updating an existing aggregate ā prevents conflicts |
store.Any() |
Append-only logs where ordering doesn't matter |
Handling Conflicts¶
result, err := s.Append(ctx, tx, store.Exact(currentVersion), events)
if errors.Is(err, store.ErrOptimisticConcurrency) {
// The aggregate was modified since we last read it.
// Typical strategies:
// 1. Reload the aggregate and retry the command
// 2. Return a conflict error to the caller
return fmt.Errorf("concurrent modification of aggregate %s: %w",
aggregateID, err)
}
PostgreSQL NOTIFY¶
When a NOTIFY channel is configured, the store executes pg_notify inside the
caller's transaction on every Append().
How It Works¶
Append()callsSELECT pg_notify('<channel>', '<global_position>')inside the active transaction.- The notification is delivered only when the transaction commits ā there are no phantom notifications on rollback.
- Consumers using
NotifyDispatcherreceive the notification viapq.NewListenerand immediately wake up to process new events.
Configuration¶
Enable it on the store side:
s := postgres.NewStore(postgres.NewStoreConfig(
postgres.WithNotifyChannel("app_events"),
))
The consumer side uses NotifyDispatcher ā see
Consumers & Projections for wiring details.
Note
pg_notify adds negligible overhead to the append path. The payload is just the
global position as a string.
Performance Considerations¶
-
O(1) version lookup ā The
aggregate_headstable provides constant-time version checks. The store never scans the events table to determine the current version. -
BIGSERIAL global ordering ā
global_positionis a gap-free, auto-incrementing sequence that gives consumers a reliable total order without external coordination. -
JSONB metadata ā The
metadatacolumn usesJSONB, enabling flexible querying (metadata->>'key') without schema changes. -
Targeted indexes ā The generated schema includes indexes for the most common query patterns: aggregate lookup, event type filtering, and correlation tracing.
-
Batch appending ā Multiple events can be appended in a single
Append()call, reducing round trips and transaction overhead. -
Transaction ownership ā The caller controls the transaction boundary, making it possible to combine event appends with other writes (saga state, outbox rows) atomically.
Error Handling¶
ErrOptimisticConcurrency¶
Returned when the expected version does not match the current aggregate version.
if errors.Is(err, store.ErrOptimisticConcurrency) {
// Reload and retry, or return conflict to caller
}
ErrNoEvents¶
Returned when Append() is called with an empty events slice.
if errors.Is(err, store.ErrNoEvents) {
// Nothing to append ā this is a programming error
}
Logger Interface¶
The store accepts an optional logger conforming to store.Logger:
type Logger interface {
Debug(ctx context.Context, msg string, keyvals ...interface{})
Info(ctx context.Context, msg string, keyvals ...interface{})
Error(ctx context.Context, msg string, keyvals ...interface{})
}
Pass your implementation via WithLogger():
config := postgres.NewStoreConfig(
postgres.WithLogger(myLogger),
)
The store logs at Debug level for routine operations and Error for failures.
Quick Reference¶
| Interface | Method | Description |
|---|---|---|
EventStore |
Append |
Append events to an aggregate stream |
EventReader |
ReadEvents |
Read from the global log |
GlobalPositionReader |
GetLatestGlobalPosition |
Get the latest global position |
AggregateStreamReader |
ReadAggregateStream |
Read a single aggregate's history |
| Error | Cause |
|---|---|
ErrOptimisticConcurrency |
Expected version mismatch |
ErrNoEvents |
Empty events slice passed to Append |