Core Concepts¶
Fundamental principles of event sourcing with pupsourcing.
Table of Contents¶
Event Sourcing Fundamentals¶
Definition¶
Event sourcing stores state changes as an immutable sequence of events rather than maintaining only current state. Instead of updating records (CRUD), the system appends events that describe what happened.
Traditional CRUD:
User table:
| id | email | name | status |
| 1 | alice@example.com | Alice | active |
# UPDATE loses history
UPDATE user SET email='new@email.com' WHERE id=1
Event Sourcing:
Events (append-only):
1. UserCreated(id=1, email=alice@example.com, name=Alice)
2. EmailVerified(id=1)
3. EmailChanged(id=1, old=alice@example.com, new=alice@newdomain.com)
4. UserDeactivated(id=1, reason="account closed")
Current state = Apply events 1-4 in sequence
Historical state = Apply events up to specific point in time
Benefits¶
- Complete Audit Trail - Full history of all changes for compliance and debugging
- Temporal Queries - Reconstruct state at any point in time
- Flexible Read Models - Build new projections from existing events without migrations
- Event Replay - Reprocess historical events for debugging or new features
- Business Intelligence - Rich analytical capabilities from event history
Trade-offs¶
Advantages: - Complete historical record of all state changes - Flexible read models without migrations - Natural audit logging for compliance - Temporal query capabilities - Effective debugging through event replay
Considerations: - Higher complexity than simple CRUD - Learning curve for team members - Projections must handle idempotency - Eventual consistency in read models - Storage growth over time (mitigated by snapshots) - Schema evolution for immutable events
When to Use: - Systems requiring audit trails (financial, healthcare, legal) - Complex business domains - Applications needing temporal queries - Microservices publishing domain events - Multiple read models from same data
When to Avoid: - Simple CRUD applications - Prototypes without event sourcing requirements - Teams lacking event sourcing experience - Systems requiring strict low-latency everywhere
Bounded Contexts¶
pupsourcing requires all events to belong to a bounded context, supporting Domain-Driven Design (DDD) principles. A bounded context is an explicit boundary within which a domain model is defined and applicable.
Why Bounded Contexts?¶
- Domain Isolation: Different parts of your system (e.g., Identity, Billing, Catalog) can evolve independently
- Clear Boundaries: Events are explicitly scoped, preventing accidental mixing of concerns
- Flexible Projections: Scoped projections can filter by both aggregate type and bounded context
- Uniqueness: Event uniqueness is enforced per
(BoundedContext, AggregateType, AggregateID, AggregateVersion) - Scalability: Partition event store tables by bounded context for improved performance
- Retention Policies: Different contexts can have different data retention requirements
Example Contexts¶
// Identity context - user management
event := es.Event{
BoundedContext: "Identity",
AggregateType: "User",
AggregateID: userID,
EventType: "UserCreated",
// ...
}
// Billing context - subscription management
event := es.Event{
BoundedContext: "Billing",
AggregateType: "Subscription",
AggregateID: subscriptionID,
EventType: "SubscriptionStarted",
// ...
}
// Catalog context - product information
event := es.Event{
BoundedContext: "Catalog",
AggregateType: "Product",
AggregateID: productID,
EventType: "ProductAdded",
// ...
}
Choosing Bounded Contexts¶
Bounded contexts should align with your business domains and organizational structure. Common examples:
- Identity: User accounts, authentication, profiles
- Billing: Subscriptions, payments, invoicing
- Catalog: Products, categories, inventory
- Fulfillment: Orders, shipping, tracking
- Analytics: Usage tracking, metrics, reporting
Each context should represent a distinct area of the business with its own terminology and rules.
Core Components¶
1. Events¶
Events are immutable facts that have occurred in your system. They represent something that happened in the past and cannot be changed or deleted.
Key principles:
- Events are named in past tense: UserCreated, OrderPlaced, PaymentProcessed
- Events are immutable once persisted
- Events contain all data needed to understand what happened
- Events should be domain-focused, not technical
The Event struct represents an immutable domain event before it is stored. When you create events to append to the store, you populate this structure. The store then assigns the AggregateVersion and GlobalPosition when the event is persisted.
type Event struct {
CreatedAt time.Time
BoundedContext string // Bounded context this event belongs to (e.g., "Identity", "Billing")
AggregateType string // Type of aggregate (e.g., "User", "Order")
EventType string // Type of event (e.g., "UserCreated", "OrderPlaced")
AggregateID string // Aggregate instance identifier (UUID string, email, or any identifier)
Payload []byte // Event data (typically JSON)
Metadata []byte // Additional metadata (typically JSON)
EventVersion int // Schema version of this event type (default: 1)
CausationID es.NullString // ID of event/command that caused this event
CorrelationID es.NullString // Link related events across aggregates
TraceID es.NullString // Distributed tracing ID
EventID uuid.UUID // Unique event identifier
}
Note that AggregateVersion and GlobalPosition are not part of the Event struct because they are assigned by the store during the Append operation. These fields are only present in PersistedEvent.
Event vs. PersistedEvent¶
Event: Used when creating new events to append to the store. You populate all fields except AggregateVersion and GlobalPosition, which the store assigns automatically during persistence.
PersistedEvent: Returned after events are stored or when reading from the store. Contains all Event fields plus the store-assigned GlobalPosition and AggregateVersion.
type PersistedEvent struct {
CreatedAt time.Time
BoundedContext string
AggregateType string
EventType string
AggregateID string
Payload []byte
Metadata []byte
GlobalPosition int64 // Assigned by store - position in global event log
AggregateVersion int64 // Assigned by store - version within this aggregate
EventVersion int
CausationID es.NullString
CorrelationID es.NullString
TraceID es.NullString
EventID uuid.UUID
}
This separation ensures events are value objects until persisted. The store assigns both position in the global log and version within the aggregate, guaranteeing consistency.
Event Design Best Practices¶
✅ Good event names:
- OrderPlaced (not PlaceOrder - it already happened)
- PaymentCompleted (not Payment - be specific)
- UserEmailChanged (not UserUpdated - what exactly changed?)
❌ Bad event names:
- CreateUser (command, not event)
- Update (too generic)
- UserEvent (meaningless)
Event payload guidelines: - Include all data needed to understand the event - Don't include computed values that can be derived - Use JSON for flexibility and readability - Version your event schemas (EventVersion field)
Example:
// ✅ Good: Includes all relevant data
{
"user_id": "123",
"old_email": "alice@old.com",
"new_email": "alice@new.com",
"changed_by": "user_456",
"reason": "user requested"
}
// ❌ Bad: Missing context
{
"email": "alice@new.com"
}
2. Aggregates¶
An aggregate is a cluster of related domain objects that are treated as a unit for data changes. In event sourcing, an aggregate is the primary unit of consistency.
Core principles:
- An aggregate is a consistency boundary
- All events for an aggregate are processed in order
- Aggregates are identified by AggregateType + AggregateID
- Events within an aggregate are strictly ordered by AggregateVersion
Example: User Aggregate
// User aggregate - spans multiple events
aggregateID := uuid.New().String()
events := []es.Event{
{
BoundedContext: "Identity",
AggregateType: "User",
AggregateID: aggregateID,
EventType: "UserCreated",
EventVersion: 1,
Payload: []byte(`{"email":"alice@example.com"}`),
Metadata: []byte(`{}`),
EventID: uuid.New(),
CreatedAt: time.Now(),
},
{
BoundedContext: "Identity",
AggregateType: "User",
AggregateID: aggregateID, // Same aggregate
EventType: "EmailVerified",
EventVersion: 1,
Payload: []byte(`{}`),
Metadata: []byte(`{}`),
EventID: uuid.New(),
CreatedAt: time.Now(),
},
}
Key principle: All events for the same aggregate are processed in order.
3. Event Store¶
The event store is an append-only log of all events, providing atomic append operations with optimistic concurrency control.
type EventStore interface {
// Append events atomically with version control
Append(ctx context.Context, tx es.DBTX, expectedVersion es.ExpectedVersion, events []es.Event) (es.AppendResult, error)
}
Properties:
- Append-only (events are never modified or deleted)
- Globally ordered (via global_position)
- Transactional (uses provided transaction)
- Optimistic concurrency via expectedVersion parameter
4. Projections¶
Projections transform events into read models (materialized views), enabling flexible query patterns and eventual consistency.
Two types of projections:
- Scoped Projections - Filter by aggregate type (for read models):
type ScopedProjection interface {
Projection
AggregateTypes() []string // Filter by aggregate type
BoundedContexts() []string // Filter by bounded context
}
- Global Projections - Receive all events (for integration/audit):
type Projection interface {
Name() string
Handle(ctx context.Context, event es.PersistedEvent) error
}
Projection lifecycle: 1. Read batch of events from store starting from checkpoint 2. Apply partition filter (for horizontal scaling) 3. Apply aggregate type filter (for scoped projections) 4. Call Handle() for each event within a transaction 5. Update checkpoint atomically 6. Commit transaction 7. Repeat until context is cancelled or error occurs
5. Checkpoints¶
Checkpoints track where a projection has processed up to.
CREATE TABLE projection_checkpoints (
projection_name TEXT PRIMARY KEY,
last_global_position BIGINT NOT NULL,
updated_at TIMESTAMPTZ NOT NULL
);
Key features: - One checkpoint per projection - Updated atomically with event processing - Enables resumable processing
Key Concepts¶
Optimistic Concurrency¶
pupsourcing uses optimistic concurrency control to prevent conflicts.
// Transaction 1
tx1, _ := db.BeginTx(ctx, nil)
store.Append(ctx, tx1, es.Exact(currentVersion), events1) // Success
tx1.Commit()
// Transaction 2 (concurrent)
tx2, _ := db.BeginTx(ctx, nil)
store.Append(ctx, tx2, es.Exact(currentVersion), events2) // ErrOptimisticConcurrency
tx2.Rollback()
How it works:
1. Each aggregate has a current version in aggregate_heads table
2. When appending, version is checked (O(1) lookup)
3. New events get consecutive versions
4. Database constraint enforces uniqueness: (aggregate_type, aggregate_id, aggregate_version)
5. If another transaction committed between check and insert → conflict
Handling conflicts:
for retries := 0; retries < maxRetries; retries++ {
tx, _ := db.BeginTx(ctx, nil)
_, err := store.Append(ctx, tx, es.Exact(currentVersion), events)
if errors.Is(err, store.ErrOptimisticConcurrency) {
tx.Rollback()
// Reload aggregate, reapply command
continue
}
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
Global Position¶
Every event gets a unique, monotonically increasing position.
Event 1 → global_position = 1
Event 2 → global_position = 2
Event 3 → global_position = 3
...
Uses: - Checkpoint tracking - Event replay - Ordered processing - Temporal queries
Aggregate Versioning¶
Each aggregate has its own version sequence.
User ABC:
Event 1 → aggregate_version = 1 (UserCreated)
Event 2 → aggregate_version = 2 (EmailVerified)
Event 3 → aggregate_version = 3 (NameChanged)
User XYZ:
Event 1 → aggregate_version = 1 (UserCreated)
Event 2 → aggregate_version = 2 (Deactivated)
Uses: - Optimistic concurrency - Event replay - Aggregate reconstruction
Idempotency¶
Projections must be idempotent because events may be reprocessed during crash recovery or restarts. This ensures that processing the same event multiple times produces the same result as processing it once.
Non-idempotent (problematic):
func (p *Projection) Handle(ctx context.Context, event es.PersistedEvent) error {
if event.EventType != "OrderPlaced" {
return nil
}
// Problem: Running this twice increments the counter twice for the same event
_, err := tx.ExecContext(ctx,
"UPDATE sales_statistics SET total_orders = total_orders + 1 WHERE date = CURRENT_DATE")
return err
}
Idempotent approach 1: Track processed events explicitly
func (p *Projection) Handle(ctx context.Context, event es.PersistedEvent) error {
if event.EventType != "OrderPlaced" {
return nil
}
// Check if we've already processed this event
var exists bool
err := tx.QueryRowContext(ctx,
"SELECT EXISTS(SELECT 1 FROM order_events_processed WHERE event_id = $1)",
event.EventID).Scan(&exists)
if err != nil {
return err
}
if exists {
return nil // Already processed, skip
}
// Process the event
_, err = tx.ExecContext(ctx,
"UPDATE sales_statistics SET total_orders = total_orders + 1 WHERE date = CURRENT_DATE")
if err != nil {
return err
}
// Mark event as processed
_, err = tx.ExecContext(ctx,
"INSERT INTO order_events_processed (event_id, processed_at) VALUES ($1, NOW())",
event.EventID)
return err
}
Idempotent approach 2: Use upsert semantics
func (p *Projection) Handle(ctx context.Context, event es.PersistedEvent) error {
if event.EventType != "UserCreated" {
return nil
}
var payload struct {
Email string `json:"email"`
Name string `json:"name"`
}
if err := json.Unmarshal(event.Payload, &payload); err != nil {
return err
}
// Use INSERT ... ON CONFLICT to make this idempotent
// If aggregate_id already exists, update with the same values
_, err := tx.ExecContext(ctx,
`INSERT INTO users (aggregate_id, email, name, created_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (aggregate_id)
DO UPDATE SET email = EXCLUDED.email, name = EXCLUDED.name`,
event.AggregateID, payload.Email, payload.Name, event.CreatedAt)
return err
}
Idempotent approach 3: Use event position as version
func (p *Projection) Handle(ctx context.Context, event es.PersistedEvent) error {
if event.EventType != "InventoryAdjusted" {
return nil
}
var payload struct {
ProductID string `json:"product_id"`
Quantity int `json:"quantity"`
}
if err := json.Unmarshal(event.Payload, &payload); err != nil {
return err
}
// Only apply if this event's position is greater than last processed position for this product
result, err := tx.ExecContext(ctx,
`UPDATE inventory
SET quantity = quantity + $1, last_event_position = $2
WHERE product_id = $3 AND (last_event_position IS NULL OR last_event_position < $2)`,
payload.Quantity, event.GlobalPosition, payload.ProductID)
if err != nil {
return err
}
// If no rows updated, product doesn't exist yet
rowsAffected, _ := result.RowsAffected()
if rowsAffected == 0 {
_, err = tx.ExecContext(ctx,
`INSERT INTO inventory (product_id, quantity, last_event_position)
VALUES ($1, $2, $3)
ON CONFLICT (product_id) DO NOTHING`, // Race condition safety
payload.ProductID, payload.Quantity, event.GlobalPosition)
}
return err
}
Transaction Boundaries¶
You control transactions, not the library.
// Your responsibility: begin transaction
tx, _ := db.BeginTx(ctx, nil)
defer tx.Rollback()
// Library uses your transaction
result, err := store.Append(ctx, tx, es.NoStream(), events)
if err != nil {
return err // Rollback happens in defer
}
// Your responsibility: commit
return tx.Commit()
Benefits: - Compose operations atomically - Control isolation levels - Integrate with existing code
Design Principles¶
1. Library, Not Framework¶
pupsourcing is designed as a library that you integrate into your application. Your code controls the flow and calls library functions when needed.
Library approach (pupsourcing):
// You control when and how to run projections
processor := projection.NewProcessor(db, store, &config)
err := processor.Run(ctx, projection)
Framework approach (not pupsourcing):
// Framework discovers your code via reflection or annotations
@EventHandler
public void on(UserCreated event) { }
Benefits: You maintain full control over program flow, dependencies, and lifecycle management.
2. Explicit Dependencies¶
All dependencies are passed explicitly as parameters. There are no hidden globals, automatic dependency injection, or runtime discovery mechanisms.
Explicit dependencies (pupsourcing):
// Every dependency is visible at the call site
store := postgres.NewStore(postgres.DefaultStoreConfig())
config := projection.DefaultProcessorConfig()
processor := postgres.NewProcessor(db, store, &config)
r := runner.New()
err := r.Run(ctx, []runner.ProjectionRunner{
{Projection: proj1, Processor: processor},
})
Implicit dependencies (not pupsourcing):
// Where do projections come from? Service locator? Global registry?
runner.Start() // Unclear what this will execute
Benefits: Code is easier to understand, test, and debug when all dependencies are explicit.
3. Pull-Based Event Processing¶
Projections actively read events from the store at their own pace. The library does not use publish-subscribe patterns or push-based delivery.
Pull-based processing (pupsourcing):
// Projection reads events on its own schedule
for {
events := store.ReadEvents(ctx, tx, checkpoint, batchSize)
for _, event := range events {
projection.Handle(ctx, tx, event)
}
// Update checkpoint and commit
}
Benefits: - Natural backpressure mechanism (slow projections won't overwhelm the system) - No connection pooling or message broker management required - Works consistently across different storage backends - Simple failure recovery (resume from checkpoint)
4. Database-Centric Coordination¶
The database serves as the coordination mechanism. No external distributed systems are required for basic operation.
Database provides: - Checkpoints: Each projection tracks its position via a database row - Optimistic concurrency: Enforced through unique constraints on aggregate versions - Transactional consistency: Operations are atomic within database transactions
This approach keeps infrastructure requirements minimal while providing reliable coordination.
Common Patterns¶
Pattern 1: Read-Your-Writes¶
Write events and read them back within the same transaction for immediate consistency.
// Write event
tx, _ := db.BeginTx(ctx, nil)
aggregateID := uuid.New().String()
events := []es.Event{
{
BoundedContext: "Identity",
AggregateType: "User",
AggregateID: aggregateID,
EventType: "UserCreated",
EventVersion: 1,
Payload: []byte(`{"email":"user@example.com"}`),
Metadata: []byte(`{}`),
EventID: uuid.New(),
CreatedAt: time.Now(),
},
}
store.Append(ctx, tx, es.NoStream(), events)
// Read immediately within same transaction
aggregate, _ := store.ReadAggregateStream(ctx, tx, "Identity", "User", aggregateID, nil, nil)
tx.Commit()
Pattern 2: Event Upcasting¶
Handle different event versions:
func (p *Projection) Handle(ctx context.Context, event es.PersistedEvent) error {
switch event.EventType {
case "UserCreated":
switch event.EventVersion {
case 1:
return p.handleUserCreatedV1(event)
case 2:
return p.handleUserCreatedV2(event)
}
}
return nil
}
Pattern 3: Aggregate Reconstruction¶
Rebuild aggregate state from its event history.
type User struct {
ID string
Email string
Name string
Active bool
}
func (u *User) Apply(event es.PersistedEvent) {
switch event.EventType {
case "UserCreated":
var payload struct {
Email string `json:"email"`
Name string `json:"name"`
}
json.Unmarshal(event.Payload, &payload)
u.Email = payload.Email
u.Name = payload.Name
u.Active = true
case "UserDeactivated":
u.Active = false
case "EmailChanged":
var payload struct {
NewEmail string `json:"new_email"`
}
json.Unmarshal(event.Payload, &payload)
u.Email = payload.NewEmail
}
}
func LoadUser(ctx context.Context, tx es.DBTX, store store.AggregateStreamReader, id string) (*User, error) {
stream, err := store.ReadAggregateStream(ctx, tx, "Identity", "User", id, nil, nil)
if err != nil {
return nil, err
}
if stream.IsEmpty() {
return nil, fmt.Errorf("user not found: %s", id)
}
user := &User{ID: id}
for _, event := range stream.Events {
user.Apply(event)
}
return user, nil
}
See Also¶
- Getting Started - Setup and first steps
- Scaling Guide - Production patterns
- API Reference - Complete API docs
- Examples - Working code examples