Event Store

The event store is the persistence backbone of a pupsourcing application. It provides an append-only log of domain events backed by PostgreSQL, with built-in optimistic concurrency control, global ordering, and optional real-time notifications via LISTEN/NOTIFY.

The github.com/pupsourcing/store package defines the core types and interfaces. The store/postgres sub-package provides the production PostgreSQL implementation.


Installation

go get github.com/pupsourcing/store

Requires Go 1.24+ and PostgreSQL 12+.


Database Setup

Before using the store you need to create two tables: events and aggregate_heads. The library ships a migration generator so you can produce a plain SQL file that fits your migration tool of choice.

Migration Generation — Go API

package main

import "github.com/pupsourcing/store/migrations"

func main() {
    config := &migrations.Config{
        OutputFolder:        "./db/migrations",
        OutputFilename:      "001_event_store.sql",
        EventsTable:         "events",           // default
        AggregateHeadsTable: "aggregate_heads",   // default
    }
    migrations.GeneratePostgres(config)
}

Migration Generation — CLI

go run github.com/pupsourcing/store/cmd/migrate-gen \
    -output migrations \
    -filename 001_event_store.sql

Generated Schema

events table

The events table is the append-only log. Every persisted event gets a gap-free, monotonically increasing global_position via BIGSERIAL.

Column Type Notes
global_position BIGSERIAL Primary key — global ordering
aggregate_type TEXT NOT NULL e.g. "User", "Order"
aggregate_id TEXT NOT NULL Aggregate instance identifier
aggregate_version BIGINT NOT NULL Per-aggregate sequence number
event_id UUID NOT NULL Unique, client-generated
event_type TEXT NOT NULL e.g. "UserRegistered"
event_version INT DEFAULT 1 Schema version of the payload
payload BYTEA Serialized event data
trace_id TEXT Distributed tracing
correlation_id TEXT Correlation chain
causation_id TEXT Direct cause
metadata JSONB Arbitrary key-value metadata
created_at TIMESTAMPTZ Event timestamp

Constraints:

  • PRIMARY KEY (global_position)
  • UNIQUE (event_id)
  • UNIQUE (aggregate_type, aggregate_id, aggregate_version) — database-level safety net for optimistic concurrency

Indexes:

  • idx_events_aggregate — (aggregate_type, aggregate_id)
  • idx_events_event_type — (event_type)
  • idx_events_correlation — (correlation_id)

aggregate_heads table

The aggregate heads table tracks the latest version of each aggregate for O(1) version lookups during concurrency checks.

Column Type Notes
aggregate_type TEXT NOT NULL Composite primary key (part 1)
aggregate_id TEXT NOT NULL Composite primary key (part 2)
aggregate_version BIGINT NOT NULL Current head version
updated_at TIMESTAMPTZ Last update timestamp

Indexes:

  • idx_aggregate_heads_updated — (updated_at)

Store Configuration

Use StoreConfig to customise table names, logging, and notifications.

Default Configuration

config := postgres.DefaultStoreConfig()
// events table:          "events"
// aggregate_heads table: "aggregate_heads"
// notify channel:        "" (disabled)
// logger:                nil

Custom Configuration with Options

config := postgres.NewStoreConfig(
    postgres.WithEventsTable("my_events"),
    postgres.WithAggregateHeadsTable("my_aggregate_heads"),
    postgres.WithNotifyChannel("app_events"),
    postgres.WithLogger(myLogger),
)

Available Options

Option Default Description
WithEventsTable(name) "events" Name of the events table
WithAggregateHeadsTable(n) "aggregate_heads" Name of the aggregate heads table
WithNotifyChannel(ch) "" (disabled) PostgreSQL NOTIFY channel for real-time wakeups
WithLogger(l) nil Logger implementing store.Logger

Creating the Store

s := postgres.NewStore(config)

The returned *postgres.Store implements all four store interfaces: EventStore, EventReader, GlobalPositionReader, and AggregateStreamReader.


Core Types

Event

An Event is the pre-persistence representation. You build these and pass them to Append().

type Event struct {
    CreatedAt     time.Time
    AggregateType string
    EventType     string
    AggregateID   string
    Payload       []byte
    Metadata      []byte
    CausationID   store.NullString
    CorrelationID store.NullString
    TraceID       store.NullString
    EventVersion  int
    EventID       uuid.UUID
}

PersistedEvent

A PersistedEvent is what the store returns after persistence. It adds GlobalPosition and AggregateVersion — both assigned by PostgreSQL.

type PersistedEvent struct {
    CreatedAt        time.Time
    AggregateType    string
    EventType        string
    AggregateID      string
    Payload          []byte
    Metadata         []byte
    CausationID      store.NullString
    CorrelationID    store.NullString
    TraceID          store.NullString
    GlobalPosition   int64
    AggregateVersion int64
    EventVersion     int
    EventID          uuid.UUID
}
  • GlobalPosition — monotonically increasing sequence across all aggregates.
  • AggregateVersion — per-aggregate sequence starting at 1.

Stream

A Stream represents the event history of a single aggregate instance.

type Stream struct {
    AggregateType string
    AggregateID   string
    Events        []PersistedEvent
}
Method Description
Version() Returns the version of the last event, or 0
IsEmpty() Returns true when the stream has no events
Len() Returns the number of events in the stream

AppendResult

Returned by Append() on success.

type AppendResult struct {
    Events          []PersistedEvent
    GlobalPositions []int64
}
Method Description
FromVersion() Aggregate version of the first appended event
ToVersion() Aggregate version of the last appended event

ExpectedVersion

Controls optimistic concurrency when appending. Three constructors:

store.Any()          // Skip concurrency check
store.NoStream()     // Expect the aggregate does not exist yet (version 0)
store.Exact(version) // Expect the aggregate is at this exact version
Method Description
IsAny() True if the check is disabled
IsNoStream() True if expecting a new aggregate
IsExact() True if expecting a specific version
Value() Returns the expected version as int64
String() Human-readable representation

NullString

A nullable string helper, used for optional tracing fields.

type NullString struct {
    String string
    Valid  bool
}

Set Valid: true and populate String when the value is present. When Valid is false the field is stored as SQL NULL.


Store Interfaces

The store package defines four focused interfaces. The PostgreSQL implementation satisfies all of them.

EventStore — Appending Events

type EventStore interface {
    Append(
        ctx context.Context,
        tx *sql.Tx,
        expectedVersion ExpectedVersion,
        events []Event,
    ) (AppendResult, error)
}

Appends one or more events to a single aggregate stream inside the caller's transaction. The aggregate identity is derived from the events themselves (AggregateType + AggregateID).

EventReader — Reading the Global Log

type EventReader interface {
    ReadEvents(
        ctx context.Context,
        tx *sql.Tx,
        fromPosition int64,
        limit int,
    ) ([]PersistedEvent, error)
}

Reads events from the global log starting after fromPosition, ordered by global_position. Pass 0 to start from the beginning.

GlobalPositionReader — Position Check

type GlobalPositionReader interface {
    GetLatestGlobalPosition(ctx context.Context, tx *sql.Tx) (int64, error)
}

Returns the highest global_position in the events table. Useful for lightweight polling and health checks.

AggregateStreamReader — Aggregate History

type AggregateStreamReader interface {
    ReadAggregateStream(
        ctx context.Context,
        tx *sql.Tx,
        aggregateType, aggregateID string,
        fromVersion, toVersion *int64,
    ) (Stream, error)
}

Reads events for a specific aggregate instance, optionally filtered by a version range. Pass nil for either bound to leave it open.


Appending Events

Creating a New Aggregate

Use store.NoStream() to assert the aggregate doesn't exist yet:

tx, err := db.BeginTx(ctx, nil)
if err != nil {
    return err
}
defer tx.Rollback()

result, err := s.Append(ctx, tx, store.NoStream(), []store.Event{
    {
        AggregateType: "User",
        AggregateID:   "user-123",
        EventID:       uuid.New(),
        EventType:     "UserRegistered",
        EventVersion:  1,
        Payload:       []byte(`{"email":"alice@example.com","name":"Alice"}`),
        Metadata:      []byte(`{}`),
        CreatedAt:     time.Now(),
    },
})
if err != nil {
    return err
}

if err := tx.Commit(); err != nil {
    return err
}

fmt.Printf("Created at version %d (global pos %d)\n",
    result.ToVersion(), result.GlobalPositions[0])

Updating an Existing Aggregate

Use store.Exact(version) to guard against concurrent writes:

// First, load the current stream to get the version.
stream, err := s.ReadAggregateStream(ctx, tx, "User", "user-123", nil, nil)
if err != nil {
    return err
}

result, err := s.Append(ctx, tx, store.Exact(stream.Version()), []store.Event{
    {
        AggregateType: "User",
        AggregateID:   "user-123",
        EventID:       uuid.New(),
        EventType:     "UserEmailChanged",
        EventVersion:  1,
        Payload:       []byte(`{"email":"alice-new@example.com"}`),
        Metadata:      []byte(`{}`),
        CreatedAt:     time.Now(),
    },
})
if err != nil {
    if errors.Is(err, store.ErrOptimisticConcurrency) {
        // Another write happened — retry or return conflict
    }
    return err
}

Batch Appending

Multiple events can be appended atomically to the same aggregate:

result, err := s.Append(ctx, tx, store.NoStream(), []store.Event{
    {
        AggregateType: "Order",
        AggregateID:   "order-456",
        EventID:       uuid.New(),
        EventType:     "OrderCreated",
        EventVersion:  1,
        Payload:       []byte(`{"customer_id":"cust-1"}`),
        Metadata:      []byte(`{}`),
        CreatedAt:     time.Now(),
    },
    {
        AggregateType: "Order",
        AggregateID:   "order-456",
        EventID:       uuid.New(),
        EventType:     "OrderItemAdded",
        EventVersion:  1,
        Payload:       []byte(`{"sku":"WIDGET-1","qty":2}`),
        Metadata:      []byte(`{}`),
        CreatedAt:     time.Now(),
    },
})
if err != nil {
    return err
}

fmt.Printf("Appended versions %d–%d\n", result.FromVersion(), result.ToVersion())
// Output: Appended versions 1–2

Transaction Management

Append() operates inside your transaction. You control when to commit:

tx, err := db.BeginTx(ctx, nil)
if err != nil {
    return err
}
defer tx.Rollback()

// Multiple operations in the same transaction
_, err = s.Append(ctx, tx, store.NoStream(), userEvents)
if err != nil {
    return err
}

// e.g. write to a saga table in the same tx
_, err = tx.ExecContext(ctx, "INSERT INTO sagas ...")
if err != nil {
    return err
}

return tx.Commit()

Warning

The tx parameter must be an active transaction. Never pass nil. The store does not create or manage transactions — that is the caller's responsibility.


Reading Events

Reading an Aggregate Stream

Load the full history of a single aggregate:

stream, err := s.ReadAggregateStream(ctx, tx, "User", "user-123", nil, nil)
if err != nil {
    return err
}

if stream.IsEmpty() {
    fmt.Println("Aggregate not found")
    return nil
}

fmt.Printf("User user-123 at version %d (%d events)\n",
    stream.Version(), stream.Len())

for _, e := range stream.Events {
    fmt.Printf("  v%d: %s\n", e.AggregateVersion, e.EventType)
}

Load a specific version range:

from := int64(3)
to := int64(7)
stream, err := s.ReadAggregateStream(ctx, tx, "Order", "order-456", &from, &to)

Reading from the Global Log

Sequential reading is the foundation for consumers and projections:

var position int64 = 0

for {
    events, err := s.ReadEvents(ctx, tx, position, 100)
    if err != nil {
        return err
    }
    if len(events) == 0 {
        break // caught up
    }

    for _, e := range events {
        fmt.Printf("[%d] %s/%s v%d — %s\n",
            e.GlobalPosition, e.AggregateType, e.AggregateID,
            e.AggregateVersion, e.EventType)
        position = e.GlobalPosition
    }
}

fromPosition is exclusive — events with global_position > fromPosition are returned. Pass 0 to read from the very first event.

Filtered Reading with Scope

The PostgreSQL store adds ReadEventsWithScope to filter events by aggregate type at the database level:

scope := postgres.ReadEventsScope{
    AggregateTypes: []string{"User", "Account"},
}
events, err := s.ReadEventsWithScope(ctx, tx, position, 100, scope)

This pushes the filter to the SQL query, avoiding the need to skip irrelevant events in application code.

Getting the Latest Global Position

A lightweight check to see how far the log extends:

pos, err := s.GetLatestGlobalPosition(ctx, tx)
if err != nil {
    return err
}
fmt.Printf("Event store has events up to position %d\n", pos)

Optimistic Concurrency

Pupsourcing uses optimistic concurrency control to prevent conflicting writes to the same aggregate.

How It Works

  1. Application-level check — Before appending, the store reads the current version from aggregate_heads (an O(1) lookup by primary key) and compares it against ExpectedVersion.

  2. Database-level safety net — The UNIQUE (aggregate_type, aggregate_id, aggregate_version) constraint on the events table guarantees that even in the presence of a race condition, duplicate versions cannot be inserted.

  3. Aggregate heads update — On successful append, the store upserts aggregate_heads to reflect the new version.

Version Strategies

Strategy Use Case
store.NoStream() Creating a new aggregate — expects version 0
store.Exact(version) Updating an existing aggregate — prevents conflicts
store.Any() Append-only logs where ordering doesn't matter

Handling Conflicts

result, err := s.Append(ctx, tx, store.Exact(currentVersion), events)
if errors.Is(err, store.ErrOptimisticConcurrency) {
    // The aggregate was modified since we last read it.
    // Typical strategies:
    //   1. Reload the aggregate and retry the command
    //   2. Return a conflict error to the caller
    return fmt.Errorf("concurrent modification of aggregate %s: %w",
        aggregateID, err)
}

PostgreSQL NOTIFY

When a NOTIFY channel is configured, the store executes pg_notify inside the caller's transaction on every Append().

How It Works

  1. Append() calls SELECT pg_notify('<channel>', '<global_position>') inside the active transaction.
  2. The notification is delivered only when the transaction commits — there are no phantom notifications on rollback.
  3. Consumers using NotifyDispatcher receive the notification via pq.NewListener and immediately wake up to process new events.

Configuration

Enable it on the store side:

s := postgres.NewStore(postgres.NewStoreConfig(
    postgres.WithNotifyChannel("app_events"),
))

The consumer side uses NotifyDispatcher — see Consumers & Projections for wiring details.

Note

pg_notify adds negligible overhead to the append path. The payload is just the global position as a string.


Performance Considerations

  • O(1) version lookup — The aggregate_heads table provides constant-time version checks. The store never scans the events table to determine the current version.

  • BIGSERIAL global ordering — global_position is a gap-free, auto-incrementing sequence that gives consumers a reliable total order without external coordination.

  • JSONB metadata — The metadata column uses JSONB, enabling flexible querying (metadata->>'key') without schema changes.

  • Targeted indexes — The generated schema includes indexes for the most common query patterns: aggregate lookup, event type filtering, and correlation tracing.

  • Batch appending — Multiple events can be appended in a single Append() call, reducing round trips and transaction overhead.

  • Transaction ownership — The caller controls the transaction boundary, making it possible to combine event appends with other writes (saga state, outbox rows) atomically.


Error Handling

ErrOptimisticConcurrency

Returned when the expected version does not match the current aggregate version.

if errors.Is(err, store.ErrOptimisticConcurrency) {
    // Reload and retry, or return conflict to caller
}

ErrNoEvents

Returned when Append() is called with an empty events slice.

if errors.Is(err, store.ErrNoEvents) {
    // Nothing to append — this is a programming error
}

Logger Interface

The store accepts an optional logger conforming to store.Logger:

type Logger interface {
    Debug(ctx context.Context, msg string, keyvals ...interface{})
    Info(ctx context.Context, msg string, keyvals ...interface{})
    Error(ctx context.Context, msg string, keyvals ...interface{})
}

Pass your implementation via WithLogger():

config := postgres.NewStoreConfig(
    postgres.WithLogger(myLogger),
)

The store logs at Debug level for routine operations and Error for failures.


Quick Reference

Interface Method Description
EventStore Append Append events to an aggregate stream
EventReader ReadEvents Read from the global log
GlobalPositionReader GetLatestGlobalPosition Get the latest global position
AggregateStreamReader ReadAggregateStream Read a single aggregate's history
Error Cause
ErrOptimisticConcurrency Expected version mismatch
ErrNoEvents Empty events slice passed to Append