Core Concepts

This guide covers the fundamental concepts behind event sourcing and how pupsourcing implements them. Whether you're new to event sourcing or evaluating pupsourcing for a project, start here.

Table of Contents


What Is Event Sourcing?

Event sourcing is a pattern where you store every state change as an immutable event instead of overwriting the current state of a record. Rather than keeping only the latest snapshot, you keep the full history of everything that happened.

The Bank Statement Analogy

Think of a bank account. A traditional database stores your current balance — say, $1,250.00. If someone asks "how did you get to $1,250?" you can't answer.

A bank statement works differently. It records every transaction:

# Description Amount Balance
1 Initial deposit +$1,000.00 $1,000.00
2 Coffee shop −$4.50 $995.50
3 Salary +$500.00 $1,495.50
4 Electric bill −$120.00 $1,375.50
5 Grocery store −$125.50 $1,250.00

From this sequence of transactions, you can:

  • Reconstruct the balance at any point in time
  • Audit every change that occurred
  • Detect errors or fraud by replaying history
  • Build new views without changing the source data

Event sourcing applies this same idea to your application's data.

CRUD vs Event Sourcing

Traditional CRUD — you mutate a row in place:

-- Create a user
INSERT INTO users (id, email, status) VALUES ('user-123', 'alice@example.com', 'pending');

-- Activate the user — previous state is lost
UPDATE users SET status = 'active' WHERE id = 'user-123';

-- Change email — previous email is lost
UPDATE users SET email = 'alice@newdomain.com' WHERE id = 'user-123';

After these operations, you only know the current state. When did the user activate? What was their original email? That information is gone.

Event sourcing — you append immutable events:

UserRegistered   { email: "alice@example.com" }      → version 1
UserActivated    { activated_at: "2025-01-15T..." }   → version 2
EmailChanged     { old: "alice@example.com", new: "alice@newdomain.com" } → version 3

The current state is derived by replaying these events. Every previous state is preserved.

Benefits

  • Complete audit trail: every change is recorded with full context
  • Temporal queries: reconstruct state at any point in time
  • Replay capability: rebuild read models, fix bugs in projections, add new views — all from the same event history
  • Flexibility: new consumers can process the entire event log to build views that didn't exist when events were originally written

Events

An event is an immutable fact that records something that happened in your system. Events are always named in the past tense because they describe things that already occurred:

  • UserRegistered
  • OrderPlaced
  • PaymentProcessed
  • InventoryAdjusted

Note

Events are facts, not commands. RegisterUser is a command (a request to do something). UserRegistered is an event (confirmation that it happened).

Event vs PersistedEvent

Pupsourcing distinguishes between two event types:

Event — what you create before persistence:

event := store.Event{
    AggregateType: "Order",
    AggregateID:   "order-456",
    EventType:     "OrderPlaced",
    EventVersion:  1,
    EventID:       uuid.New(),
    Payload:       payload,       // []byte — typically JSON
    Metadata:      metadata,      // []byte — cross-cutting concerns
    CorrelationID: store.NullString{String: correlationID, Valid: true},
    CausationID:   store.NullString{String: causationID, Valid: true},
}

An Event does not have a position or aggregate version — those are assigned by the store during persistence.

PersistedEvent — what you get back after persistence:

type PersistedEvent struct {
    // Everything from Event, plus:
    GlobalPosition   int64     // Position in the global event log
    AggregateVersion int64     // Version within this aggregate's stream
}

The store assigns GlobalPosition (a monotonically increasing sequence across all events) and AggregateVersion (a sequence within the specific aggregate's stream) when the event is written.

Key Event Fields

Field Type Purpose
AggregateType string Category of aggregate (e.g., "Order", "User")
AggregateID string Unique identifier within the aggregate type
EventType string Name of the event (e.g., "OrderPlaced")
EventVersion int Schema version of the payload — used for evolution, not sequencing
EventID uuid.UUID Idempotency key you assign — not the same as position
Payload []byte Serialized event data (typically JSON)
Metadata []byte Cross-cutting data: trace IDs, user context, not domain data
CausationID NullString ID of the command or event that caused this event
CorrelationID NullString Groups related events across aggregates
TraceID NullString Distributed tracing identifier

EventVersion: Schema Evolution

EventVersion represents the schema version of the event payload, not the sequence of the event. When you change the structure of an event's payload, you increment the version:

// Version 1: original schema
store.Event{
    EventType:    "OrderPlaced",
    EventVersion: 1,
    Payload:      []byte(`{"item_id": "item-1", "quantity": 1}`),
}

// Version 2: added shipping_address field
store.Event{
    EventType:    "OrderPlaced",
    EventVersion: 2,
    Payload:      []byte(`{"item_id": "item-1", "quantity": 1, "shipping_address": "..."}`),
}

Consumers handle multiple versions using type switches. See Event Upcasting in Common Patterns.

EventID: Idempotency Key

The EventID is a UUID that you assign before appending. It serves as an idempotency key — the database enforces uniqueness on this field. If you retry an append with the same EventID, the database will reject duplicates.

Warning

EventID is not the same as GlobalPosition. The EventID is your identifier for the event. GlobalPosition is the store's monotonically increasing sequence number.

Payload and Metadata

  • Payload ([]byte): the domain-specific data of the event. Typically serialized as JSON. This is the "what happened" — the order items, the new email address, the payment amount.

  • Metadata ([]byte): cross-cutting concerns that are not part of the domain model. Examples include the user who initiated the action, request IDs, or deployment version. Stored as JSONB in PostgreSQL.

Tip

Keep domain data in the payload and operational/infrastructure data in metadata. This separation makes it easier to evolve your domain model independently of your infrastructure concerns.


Aggregates

An aggregate is a cluster of domain objects treated as a single unit for data consistency. In Domain-Driven Design, aggregates define transaction boundaries — all changes within an aggregate are atomic.

In pupsourcing, an aggregate is identified by two values:

  • AggregateType: a string that categorizes the aggregate (e.g., "User", "Order", "ShoppingCart")
  • AggregateID: a string that uniquely identifies an instance within the type (e.g., "user-123", "order-456")

Together, they form a unique identifier for an aggregate:

("User",  "user-123")   → a specific user's event stream
("Order", "order-456")  → a specific order's event stream
("User",  "user-789")   → a different user's event stream

Aggregate Streams

Every aggregate has a stream — the ordered sequence of events that belong to it. The Stream type represents this:

type Stream struct {
    AggregateType string
    AggregateID   string
    Events        []PersistedEvent
}

A stream provides these methods:

  • Version() — returns the current version (the AggregateVersion of the last event)
  • IsEmpty() — returns true if the aggregate has no events
  • Len() — returns the number of events in the stream

AggregateVersion

Each event within an aggregate's stream gets an AggregateVersion — a monotonically increasing integer starting at 1:

("Order", "order-456") stream:
  AggregateVersion 1: OrderPlaced
  AggregateVersion 2: ItemAdded
  AggregateVersion 3: ItemAdded
  AggregateVersion 4: OrderConfirmed

This version is assigned by the store on write and is used for optimistic concurrency control.

Note

AggregateVersion is scoped to a single aggregate. Two different aggregates both start at version 1 and increment independently.


The Event Store

The event store is an append-only log. Events are never modified or deleted — only new events are appended. This immutability is what gives event sourcing its power.

Two Dimensions of Ordering

Every persisted event has two sequence numbers:

  1. AggregateVersion — the position within a single aggregate's stream. Used for reconstructing aggregate state and concurrency control.

  2. GlobalPosition — the position within the global event log across all aggregates. Assigned as a BIGSERIAL by PostgreSQL. Used by consumers to process events in order.

GlobalPosition | AggregateType | AggregateID | AggregateVersion | EventType
1              | User          | user-123    | 1                | UserRegistered
2              | Order         | order-456   | 1                | OrderPlaced
3              | User          | user-123    | 2                | UserActivated
4              | Order         | order-456   | 2                | ItemAdded
5              | Order         | order-789   | 1                | OrderPlaced
6              | User          | user-123    | 3                | EmailChanged

Notice how GlobalPosition increases across all events, while AggregateVersion increases independently per aggregate.

Transaction-First Design

All event store operations require a *sql.Tx — you provide the transaction, you control the boundaries:

tx, err := db.BeginTx(ctx, nil)
if err != nil {
    return err
}
defer tx.Rollback()

result, err := eventStore.Append(ctx, tx, expectedVersion, events)
if err != nil {
    return err
}

// You can do more work in the same transaction:
// - Update a read model
// - Write to an outbox
// - Store encryption keys

if err := tx.Commit(); err != nil {
    return err
}

This is a deliberate design choice. By requiring explicit transactions, pupsourcing lets you compose multiple operations atomically. See Design Principles for more on this philosophy.

The Append Operation

The core write operation is Append:

type EventStore interface {
    Append(
        ctx context.Context,
        tx *sql.Tx,
        expectedVersion ExpectedVersion,
        events []Event,
    ) (AppendResult, error)
}

Append takes a slice of events and writes them atomically. All events in a single Append call must belong to the same aggregate.

The returned AppendResult contains the persisted events with their assigned positions:

type AppendResult struct {
    Events          []PersistedEvent
    GlobalPositions []int64
}

AppendResult provides helper methods:

  • FromVersion() — the AggregateVersion of the first event written
  • ToVersion() — the AggregateVersion of the last event written

Reading Events

Pupsourcing provides two reading interfaces:

EventReader — read from the global log:

type EventReader interface {
    ReadEvents(
        ctx context.Context,
        tx *sql.Tx,
        fromPosition int64,
        limit int,
    ) ([]PersistedEvent, error)
}

AggregateStreamReader — read a single aggregate's stream:

type AggregateStreamReader interface {
    ReadAggregateStream(
        ctx context.Context,
        tx *sql.Tx,
        aggregateType string,
        aggregateID string,
        fromVersion int64,
        toVersion int64,
    ) (Stream, error)
}

PostgreSQL Schema

Under the hood, pupsourcing uses two PostgreSQL tables:

events — the append-only event log:

Column Type Description
global_position BIGSERIAL PK Auto-incrementing global sequence
aggregate_type TEXT Aggregate category
aggregate_id TEXT Aggregate instance
aggregate_version BIGINT Version within aggregate stream
event_id UUID UNIQUE Your idempotency key
event_type TEXT Event name
event_version INT Payload schema version
payload BYTEA Serialized event data
metadata JSONB Cross-cutting metadata
trace_id TEXT Distributed tracing ID
correlation_id TEXT Correlation identifier
causation_id TEXT Causation identifier
created_at TIMESTAMPTZ When the event was written

A UNIQUE constraint on (aggregate_type, aggregate_id, aggregate_version) provides a database-level safety net for concurrency control.

aggregate_heads — tracks the current version of each aggregate:

Column Type Description
aggregate_type TEXT Part of composite PK
aggregate_id TEXT Part of composite PK
aggregate_version BIGINT Current version

This table enables O(1) version lookups — instead of scanning the events table to find the latest version, the store reads a single row from aggregate_heads.


Optimistic Concurrency

When multiple processes try to write to the same aggregate simultaneously, you need a way to prevent conflicts. Pupsourcing uses optimistic concurrency control — rather than locking the aggregate before writing, you declare what version you expect and the store verifies it at write time.

ExpectedVersion

Every Append call includes an ExpectedVersion that tells the store what state you expect the aggregate to be in:

// The aggregate must not exist yet (first event)
store.ExpectedVersion.NoStream()

// The aggregate must be at exactly this version
store.ExpectedVersion.Exact(3)

// Skip the version check entirely (use with care)
store.ExpectedVersion.Any()

NoStream() — use when creating a new aggregate. The store verifies no events exist for this (AggregateType, AggregateID) pair. If events already exist, the append fails.

Exact(version) — use when appending to an existing aggregate. You first read the aggregate's current version, then pass it as the expected version. If another process wrote events between your read and write, the versions won't match and the append fails.

Any() — skips the concurrency check entirely. Use this only when you're certain there's no contention, or when conflicting writes are acceptable.

Warning

Using Any() means you lose protection against concurrent writes. It should only be used for append-only aggregates where every event is independent and the order doesn't matter for correctness.

Two Layers of Protection

Pupsourcing provides concurrency safety at two levels:

  1. Application layer: the store reads the current version from aggregate_heads and validates it against your ExpectedVersion before inserting events.

  2. Database layer: the UNIQUE constraint on (aggregate_type, aggregate_id, aggregate_version) acts as a safety net. Even if the application check is bypassed due to a race condition, the database rejects conflicting inserts.

Handling Concurrency Conflicts

When a concurrency conflict occurs, the store returns ErrOptimisticConcurrency. The standard approach is to retry:

func placeOrder(ctx context.Context, db *sql.DB, es store.EventStore, orderID string, items []Item) error {
    const maxRetries = 3

    for attempt := 0; attempt < maxRetries; attempt++ {
        tx, err := db.BeginTx(ctx, nil)
        if err != nil {
            return err
        }
        defer tx.Rollback()

        // Read current state
        stream, err := es.ReadAggregateStream(ctx, tx, "Order", orderID, 0, 0)
        if err != nil {
            return err
        }

        // Determine expected version
        var expectedVersion store.ExpectedVersion
        if stream.IsEmpty() {
            expectedVersion = store.ExpectedVersion.NoStream()
        } else {
            expectedVersion = store.ExpectedVersion.Exact(stream.Version())
        }

        // Build and append events
        events := buildOrderEvents(stream, items)
        _, err = es.Append(ctx, tx, expectedVersion, events)
        if errors.Is(err, store.ErrOptimisticConcurrency) {
            tx.Rollback()
            continue // Retry with fresh state
        }
        if err != nil {
            return err
        }

        return tx.Commit()
    }

    return fmt.Errorf("failed to place order after %d retries", maxRetries)
}

Tip

The retry loop re-reads the aggregate's current state on each attempt. This ensures your business logic runs against the latest version. For most applications, 2–3 retries are sufficient.


Consumers and Projections

Consumers are components that process events from the global log to build projections — read-optimized views of your data.

In a traditional CRUD system, your database tables serve double duty as both the write model and the read model. With event sourcing, you separate these concerns: the event store is your write model, and projections are your read models.

The Consumer Interface

A consumer implements two methods:

type Consumer interface {
    Name() string
    Handle(ctx context.Context, tx *sql.Tx, event PersistedEvent) error
}
  • Name(): a unique identifier for this consumer. Used to track checkpoints — where this consumer has read up to in the global log.

  • Handle(): called for each event, in order. Receives a transaction that encompasses both the event processing and the checkpoint advance.

Example — a consumer that maintains a user list projection:

type UserListProjection struct{}

func (p *UserListProjection) Name() string {
    return "user-list-projection"
}

func (p *UserListProjection) Handle(ctx context.Context, tx *sql.Tx, event store.PersistedEvent) error {
    switch event.EventType {
    case "UserRegistered":
        var payload UserRegisteredPayload
        if err := json.Unmarshal(event.Payload, &payload); err != nil {
            return err
        }
        _, err := tx.ExecContext(ctx,
            `INSERT INTO user_list (id, email, status, registered_at)
             VALUES ($1, $2, 'active', $3)
             ON CONFLICT (id) DO UPDATE SET email = $2, status = 'active'`,
            event.AggregateID, payload.Email, event.CreatedAt,
        )
        return err

    case "EmailChanged":
        var payload EmailChangedPayload
        if err := json.Unmarshal(event.Payload, &payload); err != nil {
            return err
        }
        _, err := tx.ExecContext(ctx,
            `UPDATE user_list SET email = $1 WHERE id = $2`,
            payload.NewEmail, event.AggregateID,
        )
        return err
    }

    return nil // Ignore events this consumer doesn't care about
}

ScopedConsumer

If your consumer only cares about events from specific aggregate types, implement ScopedConsumer:

type ScopedConsumer interface {
    Consumer
    AggregateTypes() []string
}
func (p *UserListProjection) AggregateTypes() []string {
    return []string{"User"}
}

A ScopedConsumer still reads from the global log, but events not matching the declared aggregate types are filtered in-memory before reaching your Handle method. This reduces the work your consumer does without changing the underlying read pattern.

Note

Scoped consumers do not read from separate streams. All consumers read from the same global log — ScopedConsumer simply skips events that don't match the specified aggregate types.

Checkpoints

Each consumer tracks its position in the global log via a checkpoint — the GlobalPosition of the last event it successfully processed. When the consumer restarts, it resumes from its checkpoint.

Atomicity

The key reliability guarantee: the read model update and the checkpoint advance happen in the same transaction. Either both succeed or both fail:

BEGIN;
  -- Consumer processes event at GlobalPosition 42
  UPDATE user_list SET email = 'new@example.com' WHERE id = 'user-123';
  UPDATE consumer_checkpoints SET position = 42 WHERE consumer_name = 'user-list-projection';
COMMIT;

This means:

  • If the transaction commits, the read model is updated and the checkpoint advances — the consumer moves forward.
  • If the transaction fails, neither change persists — the consumer will reprocess the event on the next attempt.

At-Least-Once Delivery

Because checkpoints are only advanced on successful commit, events are delivered at least once. If a transaction fails mid-way (network error, database restart, application crash), the consumer will reprocess that event.

This means your consumers must be idempotent — processing the same event twice should produce the same result. See Idempotency for strategies.

Warning

Pupsourcing guarantees at-least-once delivery, not exactly-once. Design your consumers to handle duplicate events gracefully.

For more details on configuring and running consumers, see the Consumers page.


Global Position and Sequential Processing

The global event log is the backbone of pupsourcing's projection system. Understanding how it works is essential.

The Global Log

All events from all aggregates are stored in a single, ordered sequence. Each event receives a GlobalPosition — a BIGSERIAL value assigned by PostgreSQL that monotonically increases:

GlobalPosition 1: (User, user-123)  → UserRegistered
GlobalPosition 2: (Order, order-456) → OrderPlaced
GlobalPosition 3: (User, user-789)  → UserRegistered
GlobalPosition 4: (User, user-123)  → EmailChanged
GlobalPosition 5: (Order, order-456) → OrderConfirmed
...

Sequential Processing

Consumers process events in GlobalPosition order, one at a time. This sequential processing gives you strong guarantees:

  • Ordering: if event A was committed before event B, every consumer sees A before B
  • Consistency: consumers build their projections from a consistent view of history
  • Simplicity: no need for conflict resolution, merge logic, or coordination between consumers

Why a Single Log?

Pupsourcing deliberately uses a single sequential log with no partitioning, no sharding, and no parallel processing within a single consumer. This design optimizes for simplicity and correctness:

  • One log means one source of truth for ordering
  • Sequential processing means no out-of-order delivery
  • PostgreSQL's BIGSERIAL provides a reliable, gap-free sequence
  • A single PostgreSQL instance can handle millions of events

Tip

If you're worried about throughput: PostgreSQL can handle thousands of writes per second and consumers can process tens of thousands of events per second. The sequential model is more than sufficient for the vast majority of applications.


Idempotency

Because consumers use at-least-once delivery, your event handlers must be idempotent — processing the same event multiple times must produce the same outcome as processing it once.

Why It Matters

Consider this scenario:

  1. Consumer reads event at GlobalPosition 42
  2. Consumer updates the read model
  3. Transaction fails before the checkpoint advances (e.g., network error)
  4. Consumer restarts and re-reads event at GlobalPosition 42
  5. Consumer processes the same event again

If your handler simply inserts a row, the second attempt will fail or create a duplicate. If it increments a counter, the counter will be wrong.

Three Approaches

Use INSERT ... ON CONFLICT DO UPDATE so that reprocessing an event produces the same result:

func (p *UserListProjection) Handle(ctx context.Context, tx *sql.Tx, event store.PersistedEvent) error {
    switch event.EventType {
    case "UserRegistered":
        var payload UserRegisteredPayload
        if err := json.Unmarshal(event.Payload, &payload); err != nil {
            return err
        }

        _, err := tx.ExecContext(ctx,
            `INSERT INTO user_list (id, email, registered_at)
             VALUES ($1, $2, $3)
             ON CONFLICT (id) DO UPDATE
             SET email = EXCLUDED.email,
                 registered_at = EXCLUDED.registered_at`,
            event.AggregateID, payload.Email, event.CreatedAt,
        )
        return err
    }
    return nil
}

This is the simplest and most reliable approach for most use cases. If the row already exists (from a previous processing of the same event), it gets updated to the same values.

2. Position Tracking

Store the last processed GlobalPosition in your read model table:

func (p *StatsProjection) Handle(ctx context.Context, tx *sql.Tx, event store.PersistedEvent) error {
    // Check if already processed
    var lastPosition int64
    err := tx.QueryRowContext(ctx,
        `SELECT last_position FROM stats_metadata WHERE id = 'singleton'`,
    ).Scan(&lastPosition)

    if err == nil && event.GlobalPosition <= lastPosition {
        return nil // Already processed, skip
    }

    // Process the event...
    // Then update the position marker
    _, err = tx.ExecContext(ctx,
        `INSERT INTO stats_metadata (id, last_position) VALUES ('singleton', $1)
         ON CONFLICT (id) DO UPDATE SET last_position = $1`,
        event.GlobalPosition,
    )
    return err
}

This is useful when your handler performs non-idempotent operations (like incrementing counters) and you need to guarantee each event is processed exactly once.

3. Event ID Tracking

Track processed EventIDs and skip duplicates:

func (p *NotificationProjection) Handle(ctx context.Context, tx *sql.Tx, event store.PersistedEvent) error {
    // Try to record this event as processed
    result, err := tx.ExecContext(ctx,
        `INSERT INTO processed_events (event_id) VALUES ($1) ON CONFLICT DO NOTHING`,
        event.EventID,
    )
    if err != nil {
        return err
    }

    rows, _ := result.RowsAffected()
    if rows == 0 {
        return nil // Already processed
    }

    // Process the event...
    return nil
}

Tip

Start with upserts. They're the simplest approach and work for the majority of projections. Only reach for position or event ID tracking when your handler has side effects that can't be made naturally idempotent.


Design Principles

Pupsourcing is built around a set of deliberate design choices. Understanding them helps you use the library effectively and avoid fighting against its grain.

Library, Not Framework

Pupsourcing provides building blocks, not a runtime. There is no application container, no lifecycle management, no annotation processing. You structure your application however you want:

// You create and wire everything yourself
eventStore := postgres.NewEventStore(db)
worker := worker.New(db, eventStore, consumers)

// You decide when things start and stop
worker.Start(ctx)
defer worker.Stop()

This means more boilerplate but complete control. You choose your dependency injection approach, your HTTP framework, your configuration library, and your deployment strategy.

Explicit Dependencies

All dependencies are passed explicitly. No globals, no service locators, no implicit configuration:

// ✅ Dependencies are explicit and visible
func NewOrderService(
    db *sql.DB,
    eventStore store.EventStore,
    streamReader store.AggregateStreamReader,
) *OrderService {
    return &OrderService{db: db, es: eventStore, sr: streamReader}
}

// ❌ This is NOT how pupsourcing works — no hidden dependencies
func NewOrderService() *OrderService {
    return &OrderService{es: registry.GetEventStore()} // no magic
}

Every function signature tells you exactly what it needs. This makes code easier to test, reason about, and maintain.

Transaction-First

Every store operation takes *sql.Tx. This is the single most important design decision in pupsourcing.

By requiring you to pass a transaction, the library lets you compose multiple operations atomically:

tx, err := db.BeginTx(ctx, nil)
if err != nil {
    return err
}
defer tx.Rollback()

// All of these happen in ONE transaction:
result, err := eventStore.Append(ctx, tx, expectedVersion, events)  // 1. Persist events
err = updateReadModel(ctx, tx, result.Events)                       // 2. Update projection
err = writeOutboxMessage(ctx, tx, result.Events)                    // 3. Write outbox
err = storeEncryptionKeys(ctx, tx, keyID, encryptedKey)             // 4. Store keys

return tx.Commit() // All or nothing

If any step fails, the entire transaction rolls back — events, projections, outbox, everything.

Pull-Based Processing

Consumers pull events at their own pace. There is no push mechanism, no pub/sub, no backpressure protocol:

  • Consumers poll the event store for new events
  • Each consumer controls its own read rate
  • If a consumer falls behind, it simply has more events to catch up on
  • If a consumer fails, it retries from its last checkpoint

This model is simpler and more resilient than push-based alternatives. Slow consumers don't affect fast ones. Failed consumers don't create backpressure that blocks writers.

Database-Centric Coordination

PostgreSQL handles everything — not just event storage, but all coordination:

Concern PostgreSQL Feature
Event storage Tables with BIGSERIAL
Ordering BIGSERIAL global_position
Concurrency UNIQUE constraints
Version tracking aggregate_heads table
Consumer coordination Worker assignment tables
Leader election pg_try_advisory_lock
Real-time notifications pg_notify
Checkpoints Consumer checkpoint table

No Kafka. No Redis. No ZooKeeper. No RabbitMQ. Just PostgreSQL.

Note

This means your operational complexity is bounded by PostgreSQL. If you can run and maintain a PostgreSQL instance, you can run pupsourcing.


Common Patterns

These patterns come up frequently when building event-sourced applications with pupsourcing.

Read-Your-Writes

Append events and query the updated read model in the same transaction to guarantee the read model reflects the events you just wrote:

func (s *OrderService) PlaceOrder(ctx context.Context, orderID string, items []Item) (*OrderView, error) {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return nil, err
    }
    defer tx.Rollback()

    // 1. Append events
    events := buildOrderPlacedEvents(orderID, items)
    _, err = s.eventStore.Append(ctx, tx, store.ExpectedVersion.NoStream(), events)
    if err != nil {
        return nil, err
    }

    // 2. Update read model in the same transaction
    if err := s.orderProjection.Handle(ctx, tx, events); err != nil {
        return nil, err
    }

    // 3. Query the read model — guaranteed to reflect the new events
    view, err := s.orderQueries.GetOrder(ctx, tx, orderID)
    if err != nil {
        return nil, err
    }

    if err := tx.Commit(); err != nil {
        return nil, err
    }

    return view, nil
}

Tip

This pattern eliminates the "stale read" problem where you write events but the read model hasn't caught up yet.

Aggregate Reconstruction

Read a stream and fold events to rebuild the current state of an aggregate:

type Order struct {
    ID        string
    Status    string
    Items     []OrderItem
    Total     int64
    Version   int64
}

func reconstructOrder(ctx context.Context, tx *sql.Tx, reader store.AggregateStreamReader, orderID string) (*Order, error) {
    stream, err := reader.ReadAggregateStream(ctx, tx, "Order", orderID, 0, 0)
    if err != nil {
        return nil, err
    }
    if stream.IsEmpty() {
        return nil, fmt.Errorf("order %s not found", orderID)
    }

    order := &Order{ID: orderID}
    for _, event := range stream.Events {
        switch event.EventType {
        case "OrderPlaced":
            var p OrderPlacedPayload
            json.Unmarshal(event.Payload, &p)
            order.Status = "placed"
            order.Items = p.Items
            order.Total = p.Total

        case "ItemAdded":
            var p ItemAddedPayload
            json.Unmarshal(event.Payload, &p)
            order.Items = append(order.Items, p.Item)
            order.Total += p.Item.Price

        case "OrderConfirmed":
            order.Status = "confirmed"

        case "OrderCancelled":
            order.Status = "cancelled"
        }
    }
    order.Version = stream.Version()

    return order, nil
}

This pattern is the foundation of the "read, decide, append" cycle:

  1. Read the aggregate's stream and reconstruct current state
  2. Decide what events to produce based on current state and the incoming command
  3. Append the new events with the expected version from step 1

Event Upcasting

Handle multiple schema versions of an event in your consumers:

func (p *OrderProjection) Handle(ctx context.Context, tx *sql.Tx, event store.PersistedEvent) error {
    switch event.EventType {
    case "OrderPlaced":
        switch event.EventVersion {
        case 1:
            // Original schema: no shipping address
            var p OrderPlacedV1
            if err := json.Unmarshal(event.Payload, &p); err != nil {
                return err
            }
            return p.insertOrder(ctx, tx, event.AggregateID, p, "")

        case 2:
            // V2 added shipping address
            var p OrderPlacedV2
            if err := json.Unmarshal(event.Payload, &p); err != nil {
                return err
            }
            return p.insertOrder(ctx, tx, event.AggregateID, p, p.ShippingAddress)

        default:
            return fmt.Errorf("unsupported OrderPlaced version: %d", event.EventVersion)
        }
    }
    return nil
}

func (p *OrderProjection) insertOrder(ctx context.Context, tx *sql.Tx, id string, payload any, address string) error {
    _, err := tx.ExecContext(ctx,
        `INSERT INTO order_view (id, shipping_address)
         VALUES ($1, $2)
         ON CONFLICT (id) DO UPDATE SET shipping_address = EXCLUDED.shipping_address`,
        id, address,
    )
    return err
}

Note

By switching on EventVersion, you handle old and new events in the same consumer. Old events in the store don't need to be migrated — your consumer handles them at read time.


See Also

  • Getting Started — set up your first event store and write your first events
  • Consumers — configure and run consumers with the worker
  • Outbox — reliably publish events to external systems
  • Deployment — run pupsourcing in production
  • Observability — monitor your event store and consumers