API Reference

Complete API documentation for pupsourcing.

Table of Contents

  1. Core Types
  2. Observability
  3. Event Store
  4. Projections
  5. Runner Package
  6. PostgreSQL Adapter

Core Types

es.Event

Represents an immutable domain event before persistence.

type Event struct {
    CreatedAt      time.Time
    BoundedContext string          // Bounded context this event belongs to (e.g., "Identity", "Billing")
    AggregateType  string          // Type of aggregate (e.g., "User", "Order")
    EventType      string          // Type of event (e.g., "UserCreated")
    AggregateID    string          // Aggregate instance identifier (UUID string, email, or any identifier)
    Payload        []byte          // Event data (typically JSON)
    Metadata       []byte          // Additional metadata (typically JSON)
    EventVersion   int             // Schema version of this event type (default: 1)
    CausationID    es.NullString  // ID of event/command that caused this event
    CorrelationID  es.NullString  // Links related events across aggregates
    TraceID        es.NullString  // Distributed tracing ID
    EventID        uuid.UUID       // Unique event identifier
}

Note: AggregateVersion and GlobalPosition are assigned by the store during Append. The field order is optimized for memory layout.

es.ExpectedVersion

Controls optimistic concurrency for aggregate updates.

type ExpectedVersion struct {
    // internal value
}

// Constructors
func Any() ExpectedVersion         // No version check
func NoStream() ExpectedVersion    // Aggregate must not exist
func Exact(version int64) ExpectedVersion  // Aggregate must be at specific version

Usage:

  • Any(): Skip version validation. Use when you don't need concurrency control.
  • NoStream(): Enforce that the aggregate doesn't exist. Use for aggregate creation and uniqueness enforcement.
  • Exact(N): Enforce that the aggregate is at version N. Use for normal command handling with optimistic concurrency.

Examples:

// Creating a new aggregate
_, err := store.Append(ctx, tx, es.NoStream(), []es.Event{event})

// Updating an existing aggregate at version 5
_, err := store.Append(ctx, tx, es.Exact(5), []es.Event{event})

// No concurrency check
_, err := store.Append(ctx, tx, es.Any(), []es.Event{event})

// Uniqueness enforcement via reservation aggregate
email := "user@example.com"
reservationEvent := es.Event{
    BoundedContext: "Identity",
    AggregateType:  "EmailReservation",
    AggregateID:    email,  // Use email as aggregate ID
    // ... other fields
}
_, err := store.Append(ctx, tx, es.NoStream(), []es.Event{reservationEvent})
// Second attempt with same email will fail with ErrOptimisticConcurrency

es.PersistedEvent

Represents an event that has been stored, including position information.

type PersistedEvent struct {
    CreatedAt        time.Time
    BoundedContext   string
    AggregateType    string
    EventType        string
    AggregateID      string
    Payload          []byte
    Metadata         []byte
    GlobalPosition   int64       // Position in global event log (assigned by store)
    AggregateVersion int64       // Version of aggregate after this event (assigned by store)
    EventVersion     int
    CausationID      es.NullString
    CorrelationID    es.NullString
    TraceID          es.NullString
    EventID          uuid.UUID
}

Note: The field order is optimized for memory layout.

es.DBTX

Database transaction interface used throughout the library.

type DBTX interface {
    ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
    QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
    QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}

Implemented by both *sql.DB and *sql.Tx.

Observability

es.Logger

Optional logging interface for instrumenting the library.

type Logger interface {
    Debug(ctx context.Context, msg string, keyvals ...interface{})
    Info(ctx context.Context, msg string, keyvals ...interface{})
    Error(ctx context.Context, msg string, keyvals ...interface{})
}

Usage:

type MyLogger struct {
    logger *slog.Logger
}

func (l *MyLogger) Debug(ctx context.Context, msg string, keyvals ...interface{}) {
    l.logger.DebugContext(ctx, msg, keyvals...)
}

func (l *MyLogger) Info(ctx context.Context, msg string, keyvals ...interface{}) {
    l.logger.InfoContext(ctx, msg, keyvals...)
}

func (l *MyLogger) Error(ctx context.Context, msg string, keyvals ...interface{}) {
    l.logger.ErrorContext(ctx, msg, keyvals...)
}

// Inject into store
config := postgres.DefaultStoreConfig()
config.Logger = &MyLogger{logger: slog.Default()}
store := postgres.NewStore(config)

See the Observability Guide for complete documentation and examples.

es.NoOpLogger

Default logger implementation that does nothing. Used internally when no logger is configured.

type NoOpLogger struct{}

func (NoOpLogger) Debug(_ context.Context, _ string, _ ...interface{}) {}
func (NoOpLogger) Info(_ context.Context, _ string, _ ...interface{}) {}
func (NoOpLogger) Error(_ context.Context, _ string, _ ...interface{}) {}

Event Store

store.EventStore

Interface for appending events.

type EventStore interface {
    Append(ctx context.Context, tx es.DBTX, expectedVersion es.ExpectedVersion, events []es.Event) (es.AppendResult, error)
}

Append

Atomically appends events within a transaction with optimistic concurrency control.

func (s *EventStore) Append(ctx context.Context, tx es.DBTX, expectedVersion es.ExpectedVersion, events []es.Event) (es.AppendResult, error)

Parameters: - ctx: Context for cancellation - tx: Database transaction (you control transaction boundaries) - expectedVersion: Expected aggregate version (Any, NoStream, or Exact) - events: Events to append (must all be for the same aggregate)

Returns: - es.AppendResult: Result containing persisted events and global positions - Events: The persisted events with assigned versions - GlobalPositions: Assigned global positions for the events - Helper methods: FromVersion(), ToVersion() - error: Error if any (including ErrOptimisticConcurrency)

Errors: - store.ErrOptimisticConcurrency: Version conflict or expectation mismatch - store.ErrNoEvents: Empty events slice

Example:

tx, _ := db.BeginTx(ctx, nil)
defer tx.Rollback()

// Create a new aggregate
result, err := store.Append(ctx, tx, es.NoStream(), events)
if errors.Is(err, store.ErrOptimisticConcurrency) {
    // Aggregate already exists
}

fmt.Printf("Aggregate version: %d\n", result.ToVersion())
fmt.Printf("Positions: %v\n", result.GlobalPositions)

// Update existing aggregate at version 3
result, err := store.Append(ctx, tx, es.Exact(3), events)
if errors.Is(err, store.ErrOptimisticConcurrency) {
    // Version mismatch - another transaction updated the aggregate
    // Reload aggregate state and retry
}

tx.Commit()

store.EventReader

Interface for reading events sequentially.

type EventReader interface {
    ReadEvents(ctx context.Context, tx es.DBTX, fromPosition int64, limit int) ([]es.PersistedEvent, error)
}

ReadEvents

Reads events starting from a position.

func (s *EventReader) ReadEvents(ctx context.Context, tx es.DBTX, fromPosition int64, limit int) ([]es.PersistedEvent, error)

Parameters: - ctx: Context - tx: Database transaction - fromPosition: Start position (exclusive - returns events AFTER this position) - limit: Maximum number of events to return

Returns: - []es.PersistedEvent: Events ordered by global_position - error: Error if any

store.AggregateStreamReader

Interface for reading events for a specific aggregate.

type AggregateStreamReader interface {
    ReadAggregateStream(ctx context.Context, tx es.DBTX, aggregateType string, 
                       aggregateID string, fromVersion, toVersion *int64) (es.Stream, error)
}

ReadAggregateStream

Reads all events for an aggregate, optionally filtered by version range.

func (s *Store) ReadAggregateStream(ctx context.Context, tx es.DBTX, 
                                   aggregateType string, aggregateID string,
                                   fromVersion, toVersion *int64) (es.Stream, error)

Parameters: - aggregateType: Type of aggregate (e.g., "User") - aggregateID: Aggregate instance ID (string: UUID, email, or any identifier) - fromVersion: Optional minimum version (inclusive). Pass nil for all. - toVersion: Optional maximum version (inclusive). Pass nil for all.

Returns: - es.Stream: Stream containing: - AggregateType: The aggregate type - AggregateID: The aggregate ID - Events: Events ordered by aggregate_version - Helper methods: Version(), IsEmpty(), Len() - error: Error if any

Examples:

// Read all events for UUID-based aggregate
userID := uuid.New().String()
stream, _ := store.ReadAggregateStream(ctx, tx, "User", userID, nil, nil)
fmt.Printf("Aggregate version: %d\n", stream.Version())
fmt.Printf("Event count: %d\n", stream.Len())

// Process events
for _, event := range stream.Events {
    // Handle event
}

// Read from version 5 onwards
from := int64(5)
stream, _ := store.ReadAggregateStream(ctx, tx, "User", userID, &from, nil)

// Read specific range
to := int64(10)
stream, _ := store.ReadAggregateStream(ctx, tx, "User", userID, &from, &to)

// Read reservation aggregate by email
stream, _ := store.ReadAggregateStream(ctx, tx, "EmailReservation", "user@example.com", nil, nil)

// Check if aggregate exists
if stream.IsEmpty() {
    // Aggregate has no events
}

Projections

Projections transform events into query-optimized read models. There are two types:

  1. Scoped Projections - Filter events by aggregate type (for read models)
  2. Global Projections - Receive all events (for integration publishers, audit logs)

projection.ScopedProjection

Optional interface for projections that only need specific aggregate types.

type ScopedProjection interface {
    Projection
    AggregateTypes() []string
}

When to Use

Use ScopedProjection for: - Read models for specific aggregates (e.g., user profile view) - Domain-specific denormalizations (e.g., order summary) - Search indexes for specific entity types

Use Projection (global) for: - Message broker integrations (Watermill, Kafka, RabbitMQ) - Outbox pattern implementations - Complete audit trails - Cross-aggregate analytics

AggregateTypes

Returns the list of aggregate types this projection processes.

func (p *UserReadModelProjection) AggregateTypes() []string {
    return []string{"User"}  // Only receives User events
}

func (p *OrderUserProjection) AggregateTypes() []string {
    return []string{"User", "Order"}  // Receives User and Order events
}

Behavior: - If list is non-empty, only events matching these aggregate types are delivered to Handle() - If list is empty, projection receives all events (same as global projection) - Filtering happens at processor level, not in handler (O(1) map lookup)

Example: Scoped Read Model

type UserReadModelProjection struct {
    db *sql.DB
}

func (p *UserReadModelProjection) Name() string {
    return "user_read_model"
}

func (p *UserReadModelProjection) AggregateTypes() []string {
    return []string{"User"}
}

func (p *UserReadModelProjection) Handle(ctx context.Context, event es.PersistedEvent) error {
    // Only User events arrive here
    // Projection manages its own database operations
    switch event.EventType {
    case "UserCreated":
        // Update read model
    case "UserUpdated":
        // Update read model
    }
    return nil
}

projection.Projection

Interface for event projection handlers. Projections are storage-agnostic and can write to any destination (SQL, NoSQL, message brokers, search engines, etc.).

type Projection interface {
    Name() string
    Handle(ctx context.Context, event es.PersistedEvent) error
}

Breaking Change (v1.2.0): Removed tx es.DBTX parameter from Handle() method. Projections now manage their own persistence connections.

Example: Global Integration Publisher

type WatermillPublisher struct {
    publisher message.Publisher
}

func (p *WatermillPublisher) Name() string {
    return "system.integration.watermill.v1"
}

// No AggregateTypes() method - receives ALL events

func (p *WatermillPublisher) Handle(ctx context.Context, event es.PersistedEvent) error {
    // Receives all events - publish to message broker
    // Projection manages its own persistence (no transaction parameter)
    msg := message.NewMessage(event.EventID.String(), event.Payload)
    return p.publisher.Publish(event.EventType, msg)
}

Example: SQL Read Model Projection

type UserReadModelProjection struct {
    db *sql.DB  // Projection manages its own database connection
}

func (p *UserReadModelProjection) Name() string {
    return "user_read_model"
}

func (p *UserReadModelProjection) Handle(ctx context.Context, event es.PersistedEvent) error {
    // Projection manages its own database operations
    if event.EventType == "UserCreated" {
        _, err := p.db.ExecContext(ctx, "INSERT INTO users_read_model ...")
        return err
    }
    return nil
}

Name

Returns unique projection name used for checkpoint tracking.

func (p *MyProjection) Name() string {
    return "my_projection"
}

Handle

Processes a single event. Projections manage their own persistence.

func (p *MyProjection) Handle(ctx context.Context, event es.PersistedEvent) error {
    // Process event
    return nil
}

Parameters: - ctx: Context for cancellation - event: Event to process (passed by value to enforce immutability)

Returns: - error: Return error to stop projection processing

Important: - Make projections idempotent - events may be reprocessed on crash recovery - Projections manage their own persistence connections (database, message broker, etc.) - For SQL projections, consider managing transactions within your Handle implementation

postgres.Processor

PostgreSQL-specific processor for running projections. Manages SQL transactions and checkpointing internally.

Breaking Change (v1.2.0): Moved from projection.Processor to adapter-specific implementations. Each adapter (postgres, mysql, sqlite) has its own processor.

NewProcessor

Creates a new PostgreSQL projection processor.

func NewProcessor(db *sql.DB, store *postgres.Store, config *projection.ProcessorConfig) *Processor

Parameters: - db: PostgreSQL database connection - store: PostgreSQL store (implements EventReader and CheckpointStore) - config: Processor configuration (passed by pointer)

Returns: - *Processor: Processor instance

Example:

store := postgres.NewStore(postgres.DefaultStoreConfig())
config := projection.DefaultProcessorConfig()
processor := postgres.NewProcessor(db, store, &config)

projection.Processor (Legacy)

Deprecated in v1.2.0 - Use adapter-specific processors instead:

Processes events for a projection.

type Processor struct {
    // unexported fields
}

NewProcessor

Creates a new projection processor.

func NewProcessor(txProvider TxProvider, eventReader store.EventReader, checkpointStore store.CheckpointStore, config *ProcessorConfig) *Processor

Parameters: - txProvider: Transaction provider (typically *sql.DB) - eventReader: Event reader implementation - checkpointStore: Checkpoint store implementation - config: Processor configuration (passed by pointer)

Breaking Change (v1.2.0): Added checkpointStore parameter to decouple from database-specific checkpoint logic. The txProvider replaces the previous db parameter to support non-SQL implementations.

Breaking Change (v1.1.0): Changed from ProcessorConfig (value) to *ProcessorConfig (pointer) for better performance.

Returns: - *Processor: Processor instance

Run

Runs the projection until context is cancelled or an error occurs.

func (p *Processor) Run(ctx context.Context, projection Projection) error

Parameters: - ctx: Context for cancellation - projection: Projection to run

Returns: - error: Error if projection handler fails, or ctx.Err() on cancellation

Example:

store := postgres.NewStore(postgres.DefaultStoreConfig())
config := projection.DefaultProcessorConfig()
processor := postgres.NewProcessor(db, store, &config)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := processor.Run(ctx, myProjection)
if errors.Is(err, context.Canceled) {
    log.Println("Projection stopped gracefully")
}

projection.ProcessorConfig

Configuration for projection processor.

type ProcessorConfig struct {
    PartitionStrategy PartitionStrategy  // Partitioning strategy
    Logger            es.Logger          // Optional logger (nil = disabled)
    BatchSize         int                // Events per batch
    PartitionKey      int                // This worker's partition (0-indexed)
    TotalPartitions   int                // Total number of partitions
}

Note: Fields are ordered by size (interfaces/pointers first) for optimal memory layout.

Breaking Change (v1.2.0): Removed EventsTable and CheckpointsTable fields. Table configuration is now handled by the adapter's store configuration.

DefaultProcessorConfig

Returns default configuration.

func DefaultProcessorConfig() ProcessorConfig {
    return ProcessorConfig{
        BatchSize:         100,
        PartitionKey:      0,
        TotalPartitions:   1,
        PartitionStrategy: HashPartitionStrategy{},
        Logger:            nil,  // No logging by default
    }
}

projection.PartitionStrategy

Interface for partitioning strategies.

type PartitionStrategy interface {
    ShouldProcess(aggregateID string, partitionKey, totalPartitions int) bool
}

projection.HashPartitionStrategy

Default hash-based partitioning strategy.

type HashPartitionStrategy struct{}

func (HashPartitionStrategy) ShouldProcess(aggregateID string, partitionKey, totalPartitions int) bool

Uses FNV-1a hash for deterministic, even distribution.

Runner Package

runner.Runner

Orchestrates multiple projections.

type Runner struct {
    // unexported fields
}

New

Creates a new runner.

func New(db *sql.DB, eventReader store.EventReader) *Runner

Run

Runs multiple projections concurrently.

func (r *Runner) Run(ctx context.Context, configs []ProjectionConfig) error

Parameters: - ctx: Context for cancellation - configs: Projection configurations

Returns: - error: First error from any projection, or ctx.Err()

Example:

store := postgres.NewStore(postgres.DefaultStoreConfig())
runner := runner.New()

// Create processors for each projection
config1 := projection.DefaultProcessorConfig()
processor1 := postgres.NewProcessor(db, store, &config1)

config2 := projection.DefaultProcessorConfig()
processor2 := postgres.NewProcessor(db, store, &config2)

// Run projections
runners := []runner.ProjectionRunner{
    {Projection: proj1, Processor: processor1},
    {Projection: proj2, Processor: processor2},
}

err := runner.Run(ctx, runners)

runner.ProjectionRunner

Pairs a projection with its adapter-specific processor.

type ProjectionRunner struct {
    Projection projection.Projection
    Processor  projection.ProcessorRunner
}

Fields: - Projection: The projection to run - Processor: Adapter-specific processor (postgres.Processor, mysql.Processor, etc.)

Example:

store := postgres.NewStore(postgres.DefaultStoreConfig())
config := projection.DefaultProcessorConfig()
config.PartitionKey = 0
config.TotalPartitions = 4
processor := postgres.NewProcessor(db, store, &config)

runner := runner.New()
err := runner.Run(ctx, []runner.ProjectionRunner{
    {Projection: &MyProjection{}, Processor: processor},
})

PostgreSQL Adapter

postgres.Store

PostgreSQL implementation of EventStore, EventReader, AggregateStreamReader, and CheckpointStore.

type Store struct {
    // unexported fields
}

NewStore

Creates a new PostgreSQL store.

func NewStore(config StoreConfig) *Store

Parameters: - config: Store configuration

Returns: - *Store: Store instance

Example:

store := postgres.NewStore(postgres.DefaultStoreConfig())

postgres.StoreConfig

Configuration for PostgreSQL store.

type StoreConfig struct {
    Logger              es.Logger  // Optional logger (nil = disabled)
    EventsTable         string     // Events table name
    AggregateHeadsTable string     // Aggregate heads table name
    CheckpointsTable    string     // Checkpoints table name
}

Note: Fields are ordered by size (interfaces/pointers first) for optimal memory layout.

DefaultStoreConfig

Returns default configuration.

func DefaultStoreConfig() StoreConfig {
    return StoreConfig{
        EventsTable:         "events",
        AggregateHeadsTable: "aggregate_heads",
        CheckpointsTable:    "projection_checkpoints",
        Logger:              nil,  // No logging by default
    }
}

Error Types

store.ErrOptimisticConcurrency

Returned when a version conflict is detected.

var ErrOptimisticConcurrency = errors.New("optimistic concurrency conflict")

Example:

_, err := store.Append(ctx, tx, es.Exact(currentVersion), events)
if errors.Is(err, store.ErrOptimisticConcurrency) {
    // Retry transaction
}

store.ErrNoEvents

Returned when attempting to append zero events.

var ErrNoEvents = errors.New("no events to append")

projection.ErrProjectionStopped

Returned when a projection stops due to handler error.

var ErrProjectionStopped = errors.New("projection stopped")

runner.ErrNoProjections

Returned when no projections are provided.

var ErrNoProjections = errors.New("no projections provided")

runner.ErrInvalidPartitionConfig

Returned when partition configuration is invalid.

var ErrInvalidPartitionConfig = errors.New("invalid partition configuration")

See Also