Core Concepts¶
This guide covers the fundamental concepts behind event sourcing and how pupsourcing implements them. Whether you're new to event sourcing or evaluating pupsourcing for a project, start here.
Table of Contents¶
- What Is Event Sourcing?
- Events
- Aggregates
- The Event Store
- Optimistic Concurrency
- Consumers and Projections
- Global Position and Sequential Processing
- Idempotency
- Design Principles
- Common Patterns
What Is Event Sourcing?¶
Event sourcing is a pattern where you store every state change as an immutable event instead of overwriting the current state of a record. Rather than keeping only the latest snapshot, you keep the full history of everything that happened.
The Bank Statement Analogy¶
Think of a bank account. A traditional database stores your current balance — say, $1,250.00. If someone asks "how did you get to $1,250?" you can't answer.
A bank statement works differently. It records every transaction:
| # | Description | Amount | Balance |
|---|---|---|---|
| 1 | Initial deposit | +$1,000.00 | $1,000.00 |
| 2 | Coffee shop | −$4.50 | $995.50 |
| 3 | Salary | +$500.00 | $1,495.50 |
| 4 | Electric bill | −$120.00 | $1,375.50 |
| 5 | Grocery store | −$125.50 | $1,250.00 |
From this sequence of transactions, you can:
- Reconstruct the balance at any point in time
- Audit every change that occurred
- Detect errors or fraud by replaying history
- Build new views without changing the source data
Event sourcing applies this same idea to your application's data.
CRUD vs Event Sourcing¶
Traditional CRUD — you mutate a row in place:
-- Create a user
INSERT INTO users (id, email, status) VALUES ('user-123', 'alice@example.com', 'pending');
-- Activate the user — previous state is lost
UPDATE users SET status = 'active' WHERE id = 'user-123';
-- Change email — previous email is lost
UPDATE users SET email = 'alice@newdomain.com' WHERE id = 'user-123';
After these operations, you only know the current state. When did the user activate? What was their original email? That information is gone.
Event sourcing — you append immutable events:
UserRegistered { email: "alice@example.com" } → version 1
UserActivated { activated_at: "2025-01-15T..." } → version 2
EmailChanged { old: "alice@example.com", new: "alice@newdomain.com" } → version 3
The current state is derived by replaying these events. Every previous state is preserved.
Benefits¶
- Complete audit trail: every change is recorded with full context
- Temporal queries: reconstruct state at any point in time
- Replay capability: rebuild read models, fix bugs in projections, add new views — all from the same event history
- Flexibility: new consumers can process the entire event log to build views that didn't exist when events were originally written
Events¶
An event is an immutable fact that records something that happened in your system. Events are always named in the past tense because they describe things that already occurred:
UserRegisteredOrderPlacedPaymentProcessedInventoryAdjusted
Note
Events are facts, not commands. RegisterUser is a command (a request to do something). UserRegistered is an event (confirmation that it happened).
Event vs PersistedEvent¶
Pupsourcing distinguishes between two event types:
Event — what you create before persistence:
event := store.Event{
AggregateType: "Order",
AggregateID: "order-456",
EventType: "OrderPlaced",
EventVersion: 1,
EventID: uuid.New(),
Payload: payload, // []byte — typically JSON
Metadata: metadata, // []byte — cross-cutting concerns
CorrelationID: store.NullString{String: correlationID, Valid: true},
CausationID: store.NullString{String: causationID, Valid: true},
}
An Event does not have a position or aggregate version — those are assigned by the store during persistence.
PersistedEvent — what you get back after persistence:
type PersistedEvent struct {
// Everything from Event, plus:
GlobalPosition int64 // Position in the global event log
AggregateVersion int64 // Version within this aggregate's stream
}
The store assigns GlobalPosition (a monotonically increasing sequence across all events) and AggregateVersion (a sequence within the specific aggregate's stream) when the event is written.
Key Event Fields¶
| Field | Type | Purpose |
|---|---|---|
AggregateType |
string |
Category of aggregate (e.g., "Order", "User") |
AggregateID |
string |
Unique identifier within the aggregate type |
EventType |
string |
Name of the event (e.g., "OrderPlaced") |
EventVersion |
int |
Schema version of the payload — used for evolution, not sequencing |
EventID |
uuid.UUID |
Idempotency key you assign — not the same as position |
Payload |
[]byte |
Serialized event data (typically JSON) |
Metadata |
[]byte |
Cross-cutting data: trace IDs, user context, not domain data |
CausationID |
NullString |
ID of the command or event that caused this event |
CorrelationID |
NullString |
Groups related events across aggregates |
TraceID |
NullString |
Distributed tracing identifier |
EventVersion: Schema Evolution¶
EventVersion represents the schema version of the event payload, not the sequence of the event. When you change the structure of an event's payload, you increment the version:
// Version 1: original schema
store.Event{
EventType: "OrderPlaced",
EventVersion: 1,
Payload: []byte(`{"item_id": "item-1", "quantity": 1}`),
}
// Version 2: added shipping_address field
store.Event{
EventType: "OrderPlaced",
EventVersion: 2,
Payload: []byte(`{"item_id": "item-1", "quantity": 1, "shipping_address": "..."}`),
}
Consumers handle multiple versions using type switches. See Event Upcasting in Common Patterns.
EventID: Idempotency Key¶
The EventID is a UUID that you assign before appending. It serves as an idempotency key — the database enforces uniqueness on this field. If you retry an append with the same EventID, the database will reject duplicates.
Warning
EventID is not the same as GlobalPosition. The EventID is your identifier for the event. GlobalPosition is the store's monotonically increasing sequence number.
Payload and Metadata¶
-
Payload (
[]byte): the domain-specific data of the event. Typically serialized as JSON. This is the "what happened" — the order items, the new email address, the payment amount. -
Metadata (
[]byte): cross-cutting concerns that are not part of the domain model. Examples include the user who initiated the action, request IDs, or deployment version. Stored as JSONB in PostgreSQL.
Tip
Keep domain data in the payload and operational/infrastructure data in metadata. This separation makes it easier to evolve your domain model independently of your infrastructure concerns.
Aggregates¶
An aggregate is a cluster of domain objects treated as a single unit for data consistency. In Domain-Driven Design, aggregates define transaction boundaries — all changes within an aggregate are atomic.
In pupsourcing, an aggregate is identified by two values:
- AggregateType: a string that categorizes the aggregate (e.g.,
"User","Order","ShoppingCart") - AggregateID: a string that uniquely identifies an instance within the type (e.g.,
"user-123","order-456")
Together, they form a unique identifier for an aggregate:
("User", "user-123") → a specific user's event stream
("Order", "order-456") → a specific order's event stream
("User", "user-789") → a different user's event stream
Aggregate Streams¶
Every aggregate has a stream — the ordered sequence of events that belong to it. The Stream type represents this:
type Stream struct {
AggregateType string
AggregateID string
Events []PersistedEvent
}
A stream provides these methods:
Version()— returns the current version (theAggregateVersionof the last event)IsEmpty()— returns true if the aggregate has no eventsLen()— returns the number of events in the stream
AggregateVersion¶
Each event within an aggregate's stream gets an AggregateVersion — a monotonically increasing integer starting at 1:
("Order", "order-456") stream:
AggregateVersion 1: OrderPlaced
AggregateVersion 2: ItemAdded
AggregateVersion 3: ItemAdded
AggregateVersion 4: OrderConfirmed
This version is assigned by the store on write and is used for optimistic concurrency control.
Note
AggregateVersion is scoped to a single aggregate. Two different aggregates both start at version 1 and increment independently.
The Event Store¶
The event store is an append-only log. Events are never modified or deleted — only new events are appended. This immutability is what gives event sourcing its power.
Two Dimensions of Ordering¶
Every persisted event has two sequence numbers:
-
AggregateVersion — the position within a single aggregate's stream. Used for reconstructing aggregate state and concurrency control.
-
GlobalPosition — the position within the global event log across all aggregates. Assigned as a
BIGSERIALby PostgreSQL. Used by consumers to process events in order.
GlobalPosition | AggregateType | AggregateID | AggregateVersion | EventType
1 | User | user-123 | 1 | UserRegistered
2 | Order | order-456 | 1 | OrderPlaced
3 | User | user-123 | 2 | UserActivated
4 | Order | order-456 | 2 | ItemAdded
5 | Order | order-789 | 1 | OrderPlaced
6 | User | user-123 | 3 | EmailChanged
Notice how GlobalPosition increases across all events, while AggregateVersion increases independently per aggregate.
Transaction-First Design¶
All event store operations require a *sql.Tx — you provide the transaction, you control the boundaries:
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
result, err := eventStore.Append(ctx, tx, expectedVersion, events)
if err != nil {
return err
}
// You can do more work in the same transaction:
// - Update a read model
// - Write to an outbox
// - Store encryption keys
if err := tx.Commit(); err != nil {
return err
}
This is a deliberate design choice. By requiring explicit transactions, pupsourcing lets you compose multiple operations atomically. See Design Principles for more on this philosophy.
The Append Operation¶
The core write operation is Append:
type EventStore interface {
Append(
ctx context.Context,
tx *sql.Tx,
expectedVersion ExpectedVersion,
events []Event,
) (AppendResult, error)
}
Append takes a slice of events and writes them atomically. All events in a single Append call must belong to the same aggregate.
The returned AppendResult contains the persisted events with their assigned positions:
type AppendResult struct {
Events []PersistedEvent
GlobalPositions []int64
}
AppendResult provides helper methods:
FromVersion()— theAggregateVersionof the first event writtenToVersion()— theAggregateVersionof the last event written
Reading Events¶
Pupsourcing provides two reading interfaces:
EventReader — read from the global log:
type EventReader interface {
ReadEvents(
ctx context.Context,
tx *sql.Tx,
fromPosition int64,
limit int,
) ([]PersistedEvent, error)
}
AggregateStreamReader — read a single aggregate's stream:
type AggregateStreamReader interface {
ReadAggregateStream(
ctx context.Context,
tx *sql.Tx,
aggregateType string,
aggregateID string,
fromVersion int64,
toVersion int64,
) (Stream, error)
}
PostgreSQL Schema¶
Under the hood, pupsourcing uses two PostgreSQL tables:
events — the append-only event log:
| Column | Type | Description |
|---|---|---|
global_position |
BIGSERIAL PK |
Auto-incrementing global sequence |
aggregate_type |
TEXT |
Aggregate category |
aggregate_id |
TEXT |
Aggregate instance |
aggregate_version |
BIGINT |
Version within aggregate stream |
event_id |
UUID UNIQUE |
Your idempotency key |
event_type |
TEXT |
Event name |
event_version |
INT |
Payload schema version |
payload |
BYTEA |
Serialized event data |
metadata |
JSONB |
Cross-cutting metadata |
trace_id |
TEXT |
Distributed tracing ID |
correlation_id |
TEXT |
Correlation identifier |
causation_id |
TEXT |
Causation identifier |
created_at |
TIMESTAMPTZ |
When the event was written |
A UNIQUE constraint on (aggregate_type, aggregate_id, aggregate_version) provides a database-level safety net for concurrency control.
aggregate_heads — tracks the current version of each aggregate:
| Column | Type | Description |
|---|---|---|
aggregate_type |
TEXT |
Part of composite PK |
aggregate_id |
TEXT |
Part of composite PK |
aggregate_version |
BIGINT |
Current version |
This table enables O(1) version lookups — instead of scanning the events table to find the latest version, the store reads a single row from aggregate_heads.
Optimistic Concurrency¶
When multiple processes try to write to the same aggregate simultaneously, you need a way to prevent conflicts. Pupsourcing uses optimistic concurrency control — rather than locking the aggregate before writing, you declare what version you expect and the store verifies it at write time.
ExpectedVersion¶
Every Append call includes an ExpectedVersion that tells the store what state you expect the aggregate to be in:
// The aggregate must not exist yet (first event)
store.ExpectedVersion.NoStream()
// The aggregate must be at exactly this version
store.ExpectedVersion.Exact(3)
// Skip the version check entirely (use with care)
store.ExpectedVersion.Any()
NoStream() — use when creating a new aggregate. The store verifies no events exist for this (AggregateType, AggregateID) pair. If events already exist, the append fails.
Exact(version) — use when appending to an existing aggregate. You first read the aggregate's current version, then pass it as the expected version. If another process wrote events between your read and write, the versions won't match and the append fails.
Any() — skips the concurrency check entirely. Use this only when you're certain there's no contention, or when conflicting writes are acceptable.
Warning
Using Any() means you lose protection against concurrent writes. It should only be used for append-only aggregates where every event is independent and the order doesn't matter for correctness.
Two Layers of Protection¶
Pupsourcing provides concurrency safety at two levels:
-
Application layer: the store reads the current version from
aggregate_headsand validates it against yourExpectedVersionbefore inserting events. -
Database layer: the
UNIQUEconstraint on(aggregate_type, aggregate_id, aggregate_version)acts as a safety net. Even if the application check is bypassed due to a race condition, the database rejects conflicting inserts.
Handling Concurrency Conflicts¶
When a concurrency conflict occurs, the store returns ErrOptimisticConcurrency. The standard approach is to retry:
func placeOrder(ctx context.Context, db *sql.DB, es store.EventStore, orderID string, items []Item) error {
const maxRetries = 3
for attempt := 0; attempt < maxRetries; attempt++ {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// Read current state
stream, err := es.ReadAggregateStream(ctx, tx, "Order", orderID, 0, 0)
if err != nil {
return err
}
// Determine expected version
var expectedVersion store.ExpectedVersion
if stream.IsEmpty() {
expectedVersion = store.ExpectedVersion.NoStream()
} else {
expectedVersion = store.ExpectedVersion.Exact(stream.Version())
}
// Build and append events
events := buildOrderEvents(stream, items)
_, err = es.Append(ctx, tx, expectedVersion, events)
if errors.Is(err, store.ErrOptimisticConcurrency) {
tx.Rollback()
continue // Retry with fresh state
}
if err != nil {
return err
}
return tx.Commit()
}
return fmt.Errorf("failed to place order after %d retries", maxRetries)
}
Tip
The retry loop re-reads the aggregate's current state on each attempt. This ensures your business logic runs against the latest version. For most applications, 2–3 retries are sufficient.
Consumers and Projections¶
Consumers are components that process events from the global log to build projections — read-optimized views of your data.
In a traditional CRUD system, your database tables serve double duty as both the write model and the read model. With event sourcing, you separate these concerns: the event store is your write model, and projections are your read models.
The Consumer Interface¶
A consumer implements two methods:
type Consumer interface {
Name() string
Handle(ctx context.Context, tx *sql.Tx, event PersistedEvent) error
}
-
Name(): a unique identifier for this consumer. Used to track checkpoints — where this consumer has read up to in the global log. -
Handle(): called for each event, in order. Receives a transaction that encompasses both the event processing and the checkpoint advance.
Example — a consumer that maintains a user list projection:
type UserListProjection struct{}
func (p *UserListProjection) Name() string {
return "user-list-projection"
}
func (p *UserListProjection) Handle(ctx context.Context, tx *sql.Tx, event store.PersistedEvent) error {
switch event.EventType {
case "UserRegistered":
var payload UserRegisteredPayload
if err := json.Unmarshal(event.Payload, &payload); err != nil {
return err
}
_, err := tx.ExecContext(ctx,
`INSERT INTO user_list (id, email, status, registered_at)
VALUES ($1, $2, 'active', $3)
ON CONFLICT (id) DO UPDATE SET email = $2, status = 'active'`,
event.AggregateID, payload.Email, event.CreatedAt,
)
return err
case "EmailChanged":
var payload EmailChangedPayload
if err := json.Unmarshal(event.Payload, &payload); err != nil {
return err
}
_, err := tx.ExecContext(ctx,
`UPDATE user_list SET email = $1 WHERE id = $2`,
payload.NewEmail, event.AggregateID,
)
return err
}
return nil // Ignore events this consumer doesn't care about
}
ScopedConsumer¶
If your consumer only cares about events from specific aggregate types, implement ScopedConsumer:
type ScopedConsumer interface {
Consumer
AggregateTypes() []string
}
func (p *UserListProjection) AggregateTypes() []string {
return []string{"User"}
}
A ScopedConsumer still reads from the global log, but events not matching the declared aggregate types are filtered in-memory before reaching your Handle method. This reduces the work your consumer does without changing the underlying read pattern.
Note
Scoped consumers do not read from separate streams. All consumers read from the same global log — ScopedConsumer simply skips events that don't match the specified aggregate types.
Checkpoints¶
Each consumer tracks its position in the global log via a checkpoint — the GlobalPosition of the last event it successfully processed. When the consumer restarts, it resumes from its checkpoint.
Atomicity¶
The key reliability guarantee: the read model update and the checkpoint advance happen in the same transaction. Either both succeed or both fail:
BEGIN;
-- Consumer processes event at GlobalPosition 42
UPDATE user_list SET email = 'new@example.com' WHERE id = 'user-123';
UPDATE consumer_checkpoints SET position = 42 WHERE consumer_name = 'user-list-projection';
COMMIT;
This means:
- If the transaction commits, the read model is updated and the checkpoint advances — the consumer moves forward.
- If the transaction fails, neither change persists — the consumer will reprocess the event on the next attempt.
At-Least-Once Delivery¶
Because checkpoints are only advanced on successful commit, events are delivered at least once. If a transaction fails mid-way (network error, database restart, application crash), the consumer will reprocess that event.
This means your consumers must be idempotent — processing the same event twice should produce the same result. See Idempotency for strategies.
Warning
Pupsourcing guarantees at-least-once delivery, not exactly-once. Design your consumers to handle duplicate events gracefully.
For more details on configuring and running consumers, see the Consumers page.
Global Position and Sequential Processing¶
The global event log is the backbone of pupsourcing's projection system. Understanding how it works is essential.
The Global Log¶
All events from all aggregates are stored in a single, ordered sequence. Each event receives a GlobalPosition — a BIGSERIAL value assigned by PostgreSQL that monotonically increases:
GlobalPosition 1: (User, user-123) → UserRegistered
GlobalPosition 2: (Order, order-456) → OrderPlaced
GlobalPosition 3: (User, user-789) → UserRegistered
GlobalPosition 4: (User, user-123) → EmailChanged
GlobalPosition 5: (Order, order-456) → OrderConfirmed
...
Sequential Processing¶
Consumers process events in GlobalPosition order, one at a time. This sequential processing gives you strong guarantees:
- Ordering: if event A was committed before event B, every consumer sees A before B
- Consistency: consumers build their projections from a consistent view of history
- Simplicity: no need for conflict resolution, merge logic, or coordination between consumers
Why a Single Log?¶
Pupsourcing deliberately uses a single sequential log with no partitioning, no sharding, and no parallel processing within a single consumer. This design optimizes for simplicity and correctness:
- One log means one source of truth for ordering
- Sequential processing means no out-of-order delivery
- PostgreSQL's
BIGSERIALprovides a reliable, gap-free sequence - A single PostgreSQL instance can handle millions of events
Tip
If you're worried about throughput: PostgreSQL can handle thousands of writes per second and consumers can process tens of thousands of events per second. The sequential model is more than sufficient for the vast majority of applications.
Idempotency¶
Because consumers use at-least-once delivery, your event handlers must be idempotent — processing the same event multiple times must produce the same outcome as processing it once.
Why It Matters¶
Consider this scenario:
- Consumer reads event at GlobalPosition 42
- Consumer updates the read model
- Transaction fails before the checkpoint advances (e.g., network error)
- Consumer restarts and re-reads event at GlobalPosition 42
- Consumer processes the same event again
If your handler simply inserts a row, the second attempt will fail or create a duplicate. If it increments a counter, the counter will be wrong.
Three Approaches¶
1. Upsert (Recommended)¶
Use INSERT ... ON CONFLICT DO UPDATE so that reprocessing an event produces the same result:
func (p *UserListProjection) Handle(ctx context.Context, tx *sql.Tx, event store.PersistedEvent) error {
switch event.EventType {
case "UserRegistered":
var payload UserRegisteredPayload
if err := json.Unmarshal(event.Payload, &payload); err != nil {
return err
}
_, err := tx.ExecContext(ctx,
`INSERT INTO user_list (id, email, registered_at)
VALUES ($1, $2, $3)
ON CONFLICT (id) DO UPDATE
SET email = EXCLUDED.email,
registered_at = EXCLUDED.registered_at`,
event.AggregateID, payload.Email, event.CreatedAt,
)
return err
}
return nil
}
This is the simplest and most reliable approach for most use cases. If the row already exists (from a previous processing of the same event), it gets updated to the same values.
2. Position Tracking¶
Store the last processed GlobalPosition in your read model table:
func (p *StatsProjection) Handle(ctx context.Context, tx *sql.Tx, event store.PersistedEvent) error {
// Check if already processed
var lastPosition int64
err := tx.QueryRowContext(ctx,
`SELECT last_position FROM stats_metadata WHERE id = 'singleton'`,
).Scan(&lastPosition)
if err == nil && event.GlobalPosition <= lastPosition {
return nil // Already processed, skip
}
// Process the event...
// Then update the position marker
_, err = tx.ExecContext(ctx,
`INSERT INTO stats_metadata (id, last_position) VALUES ('singleton', $1)
ON CONFLICT (id) DO UPDATE SET last_position = $1`,
event.GlobalPosition,
)
return err
}
This is useful when your handler performs non-idempotent operations (like incrementing counters) and you need to guarantee each event is processed exactly once.
3. Event ID Tracking¶
Track processed EventIDs and skip duplicates:
func (p *NotificationProjection) Handle(ctx context.Context, tx *sql.Tx, event store.PersistedEvent) error {
// Try to record this event as processed
result, err := tx.ExecContext(ctx,
`INSERT INTO processed_events (event_id) VALUES ($1) ON CONFLICT DO NOTHING`,
event.EventID,
)
if err != nil {
return err
}
rows, _ := result.RowsAffected()
if rows == 0 {
return nil // Already processed
}
// Process the event...
return nil
}
Tip
Start with upserts. They're the simplest approach and work for the majority of projections. Only reach for position or event ID tracking when your handler has side effects that can't be made naturally idempotent.
Design Principles¶
Pupsourcing is built around a set of deliberate design choices. Understanding them helps you use the library effectively and avoid fighting against its grain.
Library, Not Framework¶
Pupsourcing provides building blocks, not a runtime. There is no application container, no lifecycle management, no annotation processing. You structure your application however you want:
// You create and wire everything yourself
eventStore := postgres.NewEventStore(db)
worker := worker.New(db, eventStore, consumers)
// You decide when things start and stop
worker.Start(ctx)
defer worker.Stop()
This means more boilerplate but complete control. You choose your dependency injection approach, your HTTP framework, your configuration library, and your deployment strategy.
Explicit Dependencies¶
All dependencies are passed explicitly. No globals, no service locators, no implicit configuration:
// ✅ Dependencies are explicit and visible
func NewOrderService(
db *sql.DB,
eventStore store.EventStore,
streamReader store.AggregateStreamReader,
) *OrderService {
return &OrderService{db: db, es: eventStore, sr: streamReader}
}
// ❌ This is NOT how pupsourcing works — no hidden dependencies
func NewOrderService() *OrderService {
return &OrderService{es: registry.GetEventStore()} // no magic
}
Every function signature tells you exactly what it needs. This makes code easier to test, reason about, and maintain.
Transaction-First¶
Every store operation takes *sql.Tx. This is the single most important design decision in pupsourcing.
By requiring you to pass a transaction, the library lets you compose multiple operations atomically:
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// All of these happen in ONE transaction:
result, err := eventStore.Append(ctx, tx, expectedVersion, events) // 1. Persist events
err = updateReadModel(ctx, tx, result.Events) // 2. Update projection
err = writeOutboxMessage(ctx, tx, result.Events) // 3. Write outbox
err = storeEncryptionKeys(ctx, tx, keyID, encryptedKey) // 4. Store keys
return tx.Commit() // All or nothing
If any step fails, the entire transaction rolls back — events, projections, outbox, everything.
Pull-Based Processing¶
Consumers pull events at their own pace. There is no push mechanism, no pub/sub, no backpressure protocol:
- Consumers poll the event store for new events
- Each consumer controls its own read rate
- If a consumer falls behind, it simply has more events to catch up on
- If a consumer fails, it retries from its last checkpoint
This model is simpler and more resilient than push-based alternatives. Slow consumers don't affect fast ones. Failed consumers don't create backpressure that blocks writers.
Database-Centric Coordination¶
PostgreSQL handles everything — not just event storage, but all coordination:
| Concern | PostgreSQL Feature |
|---|---|
| Event storage | Tables with BIGSERIAL |
| Ordering | BIGSERIAL global_position |
| Concurrency | UNIQUE constraints |
| Version tracking | aggregate_heads table |
| Consumer coordination | Worker assignment tables |
| Leader election | pg_try_advisory_lock |
| Real-time notifications | pg_notify |
| Checkpoints | Consumer checkpoint table |
No Kafka. No Redis. No ZooKeeper. No RabbitMQ. Just PostgreSQL.
Note
This means your operational complexity is bounded by PostgreSQL. If you can run and maintain a PostgreSQL instance, you can run pupsourcing.
Common Patterns¶
These patterns come up frequently when building event-sourced applications with pupsourcing.
Read-Your-Writes¶
Append events and query the updated read model in the same transaction to guarantee the read model reflects the events you just wrote:
func (s *OrderService) PlaceOrder(ctx context.Context, orderID string, items []Item) (*OrderView, error) {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
defer tx.Rollback()
// 1. Append events
events := buildOrderPlacedEvents(orderID, items)
_, err = s.eventStore.Append(ctx, tx, store.ExpectedVersion.NoStream(), events)
if err != nil {
return nil, err
}
// 2. Update read model in the same transaction
if err := s.orderProjection.Handle(ctx, tx, events); err != nil {
return nil, err
}
// 3. Query the read model — guaranteed to reflect the new events
view, err := s.orderQueries.GetOrder(ctx, tx, orderID)
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
return view, nil
}
Tip
This pattern eliminates the "stale read" problem where you write events but the read model hasn't caught up yet.
Aggregate Reconstruction¶
Read a stream and fold events to rebuild the current state of an aggregate:
type Order struct {
ID string
Status string
Items []OrderItem
Total int64
Version int64
}
func reconstructOrder(ctx context.Context, tx *sql.Tx, reader store.AggregateStreamReader, orderID string) (*Order, error) {
stream, err := reader.ReadAggregateStream(ctx, tx, "Order", orderID, 0, 0)
if err != nil {
return nil, err
}
if stream.IsEmpty() {
return nil, fmt.Errorf("order %s not found", orderID)
}
order := &Order{ID: orderID}
for _, event := range stream.Events {
switch event.EventType {
case "OrderPlaced":
var p OrderPlacedPayload
json.Unmarshal(event.Payload, &p)
order.Status = "placed"
order.Items = p.Items
order.Total = p.Total
case "ItemAdded":
var p ItemAddedPayload
json.Unmarshal(event.Payload, &p)
order.Items = append(order.Items, p.Item)
order.Total += p.Item.Price
case "OrderConfirmed":
order.Status = "confirmed"
case "OrderCancelled":
order.Status = "cancelled"
}
}
order.Version = stream.Version()
return order, nil
}
This pattern is the foundation of the "read, decide, append" cycle:
- Read the aggregate's stream and reconstruct current state
- Decide what events to produce based on current state and the incoming command
- Append the new events with the expected version from step 1
Event Upcasting¶
Handle multiple schema versions of an event in your consumers:
func (p *OrderProjection) Handle(ctx context.Context, tx *sql.Tx, event store.PersistedEvent) error {
switch event.EventType {
case "OrderPlaced":
switch event.EventVersion {
case 1:
// Original schema: no shipping address
var p OrderPlacedV1
if err := json.Unmarshal(event.Payload, &p); err != nil {
return err
}
return p.insertOrder(ctx, tx, event.AggregateID, p, "")
case 2:
// V2 added shipping address
var p OrderPlacedV2
if err := json.Unmarshal(event.Payload, &p); err != nil {
return err
}
return p.insertOrder(ctx, tx, event.AggregateID, p, p.ShippingAddress)
default:
return fmt.Errorf("unsupported OrderPlaced version: %d", event.EventVersion)
}
}
return nil
}
func (p *OrderProjection) insertOrder(ctx context.Context, tx *sql.Tx, id string, payload any, address string) error {
_, err := tx.ExecContext(ctx,
`INSERT INTO order_view (id, shipping_address)
VALUES ($1, $2)
ON CONFLICT (id) DO UPDATE SET shipping_address = EXCLUDED.shipping_address`,
id, address,
)
return err
}
Note
By switching on EventVersion, you handle old and new events in the same consumer. Old events in the store don't need to be migrated — your consumer handles them at read time.
See Also¶
- Getting Started — set up your first event store and write your first events
- Consumers — configure and run consumers with the worker
- Outbox — reliably publish events to external systems
- Deployment — run pupsourcing in production
- Observability — monitor your event store and consumers