API Reference¶
Complete API documentation for pupsourcing.
Table of Contents¶
Core Types¶
es.Event¶
Represents an immutable domain event before persistence.
type Event struct {
CreatedAt time.Time
BoundedContext string // Bounded context this event belongs to (e.g., "Identity", "Billing")
AggregateType string // Type of aggregate (e.g., "User", "Order")
EventType string // Type of event (e.g., "UserCreated")
AggregateID string // Aggregate instance identifier (UUID string, email, or any identifier)
Payload []byte // Event data (typically JSON)
Metadata []byte // Additional metadata (typically JSON)
EventVersion int // Schema version of this event type (default: 1)
CausationID es.NullString // ID of event/command that caused this event
CorrelationID es.NullString // Links related events across aggregates
TraceID es.NullString // Distributed tracing ID
EventID uuid.UUID // Unique event identifier
}
Note: AggregateVersion and GlobalPosition are assigned by the store during Append. The field order is optimized for memory layout.
es.ExpectedVersion¶
Controls optimistic concurrency for aggregate updates.
type ExpectedVersion struct {
// internal value
}
// Constructors
func Any() ExpectedVersion // No version check
func NoStream() ExpectedVersion // Aggregate must not exist
func Exact(version int64) ExpectedVersion // Aggregate must be at specific version
Usage:
- Any(): Skip version validation. Use when you don't need concurrency control.
- NoStream(): Enforce that the aggregate doesn't exist. Use for aggregate creation and uniqueness enforcement.
- Exact(N): Enforce that the aggregate is at version N. Use for normal command handling with optimistic concurrency.
Examples:
// Creating a new aggregate
_, err := store.Append(ctx, tx, es.NoStream(), []es.Event{event})
// Updating an existing aggregate at version 5
_, err := store.Append(ctx, tx, es.Exact(5), []es.Event{event})
// No concurrency check
_, err := store.Append(ctx, tx, es.Any(), []es.Event{event})
// Uniqueness enforcement via reservation aggregate
email := "user@example.com"
reservationEvent := es.Event{
BoundedContext: "Identity",
AggregateType: "EmailReservation",
AggregateID: email, // Use email as aggregate ID
// ... other fields
}
_, err := store.Append(ctx, tx, es.NoStream(), []es.Event{reservationEvent})
// Second attempt with same email will fail with ErrOptimisticConcurrency
es.PersistedEvent¶
Represents an event that has been stored, including position information.
type PersistedEvent struct {
CreatedAt time.Time
BoundedContext string
AggregateType string
EventType string
AggregateID string
Payload []byte
Metadata []byte
GlobalPosition int64 // Position in global event log (assigned by store)
AggregateVersion int64 // Version of aggregate after this event (assigned by store)
EventVersion int
CausationID es.NullString
CorrelationID es.NullString
TraceID es.NullString
EventID uuid.UUID
}
Note: The field order is optimized for memory layout.
es.DBTX¶
Database transaction interface used throughout the library.
type DBTX interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}
Implemented by both *sql.DB and *sql.Tx.
Observability¶
es.Logger¶
Optional logging interface for instrumenting the library.
type Logger interface {
Debug(ctx context.Context, msg string, keyvals ...interface{})
Info(ctx context.Context, msg string, keyvals ...interface{})
Error(ctx context.Context, msg string, keyvals ...interface{})
}
Usage:
type MyLogger struct {
logger *slog.Logger
}
func (l *MyLogger) Debug(ctx context.Context, msg string, keyvals ...interface{}) {
l.logger.DebugContext(ctx, msg, keyvals...)
}
func (l *MyLogger) Info(ctx context.Context, msg string, keyvals ...interface{}) {
l.logger.InfoContext(ctx, msg, keyvals...)
}
func (l *MyLogger) Error(ctx context.Context, msg string, keyvals ...interface{}) {
l.logger.ErrorContext(ctx, msg, keyvals...)
}
// Inject into store
config := postgres.DefaultStoreConfig()
config.Logger = &MyLogger{logger: slog.Default()}
store := postgres.NewStore(config)
See the Observability Guide for complete documentation and examples.
es.NoOpLogger¶
Default logger implementation that does nothing. Used internally when no logger is configured.
type NoOpLogger struct{}
func (NoOpLogger) Debug(_ context.Context, _ string, _ ...interface{}) {}
func (NoOpLogger) Info(_ context.Context, _ string, _ ...interface{}) {}
func (NoOpLogger) Error(_ context.Context, _ string, _ ...interface{}) {}
Event Store¶
store.EventStore¶
Interface for appending events.
type EventStore interface {
Append(ctx context.Context, tx es.DBTX, expectedVersion es.ExpectedVersion, events []es.Event) (es.AppendResult, error)
}
Append¶
Atomically appends events within a transaction with optimistic concurrency control.
func (s *EventStore) Append(ctx context.Context, tx es.DBTX, expectedVersion es.ExpectedVersion, events []es.Event) (es.AppendResult, error)
Parameters:
- ctx: Context for cancellation
- tx: Database transaction (you control transaction boundaries)
- expectedVersion: Expected aggregate version (Any, NoStream, or Exact)
- events: Events to append (must all be for the same aggregate)
Returns:
- es.AppendResult: Result containing persisted events and global positions
- Events: The persisted events with assigned versions
- GlobalPositions: Assigned global positions for the events
- Helper methods: FromVersion(), ToVersion()
- error: Error if any (including ErrOptimisticConcurrency)
Errors:
- store.ErrOptimisticConcurrency: Version conflict or expectation mismatch
- store.ErrNoEvents: Empty events slice
Example:
tx, _ := db.BeginTx(ctx, nil)
defer tx.Rollback()
// Create a new aggregate
result, err := store.Append(ctx, tx, es.NoStream(), events)
if errors.Is(err, store.ErrOptimisticConcurrency) {
// Aggregate already exists
}
fmt.Printf("Aggregate version: %d\n", result.ToVersion())
fmt.Printf("Positions: %v\n", result.GlobalPositions)
// Update existing aggregate at version 3
result, err := store.Append(ctx, tx, es.Exact(3), events)
if errors.Is(err, store.ErrOptimisticConcurrency) {
// Version mismatch - another transaction updated the aggregate
// Reload aggregate state and retry
}
tx.Commit()
store.EventReader¶
Interface for reading events sequentially.
type EventReader interface {
ReadEvents(ctx context.Context, tx es.DBTX, fromPosition int64, limit int) ([]es.PersistedEvent, error)
}
ReadEvents¶
Reads events starting from a position.
func (s *EventReader) ReadEvents(ctx context.Context, tx es.DBTX, fromPosition int64, limit int) ([]es.PersistedEvent, error)
Parameters:
- ctx: Context
- tx: Database transaction
- fromPosition: Start position (exclusive - returns events AFTER this position)
- limit: Maximum number of events to return
Returns:
- []es.PersistedEvent: Events ordered by global_position
- error: Error if any
store.AggregateStreamReader¶
Interface for reading events for a specific aggregate.
type AggregateStreamReader interface {
ReadAggregateStream(ctx context.Context, tx es.DBTX, aggregateType string,
aggregateID string, fromVersion, toVersion *int64) (es.Stream, error)
}
ReadAggregateStream¶
Reads all events for an aggregate, optionally filtered by version range.
func (s *Store) ReadAggregateStream(ctx context.Context, tx es.DBTX,
aggregateType string, aggregateID string,
fromVersion, toVersion *int64) (es.Stream, error)
Parameters:
- aggregateType: Type of aggregate (e.g., "User")
- aggregateID: Aggregate instance ID (string: UUID, email, or any identifier)
- fromVersion: Optional minimum version (inclusive). Pass nil for all.
- toVersion: Optional maximum version (inclusive). Pass nil for all.
Returns:
- es.Stream: Stream containing:
- AggregateType: The aggregate type
- AggregateID: The aggregate ID
- Events: Events ordered by aggregate_version
- Helper methods: Version(), IsEmpty(), Len()
- error: Error if any
Examples:
// Read all events for UUID-based aggregate
userID := uuid.New().String()
stream, _ := store.ReadAggregateStream(ctx, tx, "User", userID, nil, nil)
fmt.Printf("Aggregate version: %d\n", stream.Version())
fmt.Printf("Event count: %d\n", stream.Len())
// Process events
for _, event := range stream.Events {
// Handle event
}
// Read from version 5 onwards
from := int64(5)
stream, _ := store.ReadAggregateStream(ctx, tx, "User", userID, &from, nil)
// Read specific range
to := int64(10)
stream, _ := store.ReadAggregateStream(ctx, tx, "User", userID, &from, &to)
// Read reservation aggregate by email
stream, _ := store.ReadAggregateStream(ctx, tx, "EmailReservation", "user@example.com", nil, nil)
// Check if aggregate exists
if stream.IsEmpty() {
// Aggregate has no events
}
Projections¶
Projections transform events into query-optimized read models. There are two types:
- Scoped Projections - Filter events by aggregate type (for read models)
- Global Projections - Receive all events (for integration publishers, audit logs)
projection.ScopedProjection¶
Optional interface for projections that only need specific aggregate types.
type ScopedProjection interface {
Projection
AggregateTypes() []string
}
When to Use¶
Use ScopedProjection for: - Read models for specific aggregates (e.g., user profile view) - Domain-specific denormalizations (e.g., order summary) - Search indexes for specific entity types
Use Projection (global) for: - Message broker integrations (Watermill, Kafka, RabbitMQ) - Outbox pattern implementations - Complete audit trails - Cross-aggregate analytics
AggregateTypes¶
Returns the list of aggregate types this projection processes.
func (p *UserReadModelProjection) AggregateTypes() []string {
return []string{"User"} // Only receives User events
}
func (p *OrderUserProjection) AggregateTypes() []string {
return []string{"User", "Order"} // Receives User and Order events
}
Behavior:
- If list is non-empty, only events matching these aggregate types are delivered to Handle()
- If list is empty, projection receives all events (same as global projection)
- Filtering happens at processor level, not in handler (O(1) map lookup)
Example: Scoped Read Model¶
type UserReadModelProjection struct {
db *sql.DB
}
func (p *UserReadModelProjection) Name() string {
return "user_read_model"
}
func (p *UserReadModelProjection) AggregateTypes() []string {
return []string{"User"}
}
func (p *UserReadModelProjection) Handle(ctx context.Context, event es.PersistedEvent) error {
// Only User events arrive here
// Projection manages its own database operations
switch event.EventType {
case "UserCreated":
// Update read model
case "UserUpdated":
// Update read model
}
return nil
}
projection.Projection¶
Interface for event projection handlers. Projections are storage-agnostic and can write to any destination (SQL, NoSQL, message brokers, search engines, etc.).
type Projection interface {
Name() string
Handle(ctx context.Context, event es.PersistedEvent) error
}
Breaking Change (v1.2.0): Removed tx es.DBTX parameter from Handle() method. Projections now manage their own persistence connections.
Example: Global Integration Publisher¶
type WatermillPublisher struct {
publisher message.Publisher
}
func (p *WatermillPublisher) Name() string {
return "system.integration.watermill.v1"
}
// No AggregateTypes() method - receives ALL events
func (p *WatermillPublisher) Handle(ctx context.Context, event es.PersistedEvent) error {
// Receives all events - publish to message broker
// Projection manages its own persistence (no transaction parameter)
msg := message.NewMessage(event.EventID.String(), event.Payload)
return p.publisher.Publish(event.EventType, msg)
}
Example: SQL Read Model Projection¶
type UserReadModelProjection struct {
db *sql.DB // Projection manages its own database connection
}
func (p *UserReadModelProjection) Name() string {
return "user_read_model"
}
func (p *UserReadModelProjection) Handle(ctx context.Context, event es.PersistedEvent) error {
// Projection manages its own database operations
if event.EventType == "UserCreated" {
_, err := p.db.ExecContext(ctx, "INSERT INTO users_read_model ...")
return err
}
return nil
}
Name¶
Returns unique projection name used for checkpoint tracking.
func (p *MyProjection) Name() string {
return "my_projection"
}
Handle¶
Processes a single event. Projections manage their own persistence.
func (p *MyProjection) Handle(ctx context.Context, event es.PersistedEvent) error {
// Process event
return nil
}
Parameters:
- ctx: Context for cancellation
- event: Event to process (passed by value to enforce immutability)
Returns:
- error: Return error to stop projection processing
Important: - Make projections idempotent - events may be reprocessed on crash recovery - Projections manage their own persistence connections (database, message broker, etc.) - For SQL projections, consider managing transactions within your Handle implementation
postgres.Processor¶
PostgreSQL-specific processor for running projections. Manages SQL transactions and checkpointing internally.
Breaking Change (v1.2.0): Moved from projection.Processor to adapter-specific implementations. Each adapter (postgres, mysql, sqlite) has its own processor.
NewProcessor¶
Creates a new PostgreSQL projection processor.
func NewProcessor(db *sql.DB, store *postgres.Store, config *projection.ProcessorConfig) *Processor
Parameters:
- db: PostgreSQL database connection
- store: PostgreSQL store (implements EventReader and CheckpointStore)
- config: Processor configuration (passed by pointer)
Returns:
- *Processor: Processor instance
Example:
store := postgres.NewStore(postgres.DefaultStoreConfig())
config := projection.DefaultProcessorConfig()
processor := postgres.NewProcessor(db, store, &config)
projection.Processor (Legacy)¶
Deprecated in v1.2.0 - Use adapter-specific processors instead:
Processes events for a projection.
type Processor struct {
// unexported fields
}
NewProcessor¶
Creates a new projection processor.
func NewProcessor(txProvider TxProvider, eventReader store.EventReader, checkpointStore store.CheckpointStore, config *ProcessorConfig) *Processor
Parameters:
- txProvider: Transaction provider (typically *sql.DB)
- eventReader: Event reader implementation
- checkpointStore: Checkpoint store implementation
- config: Processor configuration (passed by pointer)
Breaking Change (v1.2.0): Added checkpointStore parameter to decouple from database-specific checkpoint logic. The txProvider replaces the previous db parameter to support non-SQL implementations.
Breaking Change (v1.1.0): Changed from ProcessorConfig (value) to *ProcessorConfig (pointer) for better performance.
Returns:
- *Processor: Processor instance
Run¶
Runs the projection until context is cancelled or an error occurs.
func (p *Processor) Run(ctx context.Context, projection Projection) error
Parameters:
- ctx: Context for cancellation
- projection: Projection to run
Returns:
- error: Error if projection handler fails, or ctx.Err() on cancellation
Example:
store := postgres.NewStore(postgres.DefaultStoreConfig())
config := projection.DefaultProcessorConfig()
processor := postgres.NewProcessor(db, store, &config)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := processor.Run(ctx, myProjection)
if errors.Is(err, context.Canceled) {
log.Println("Projection stopped gracefully")
}
projection.ProcessorConfig¶
Configuration for projection processor.
type ProcessorConfig struct {
PartitionStrategy PartitionStrategy // Partitioning strategy
Logger es.Logger // Optional logger (nil = disabled)
BatchSize int // Events per batch
PartitionKey int // This worker's partition (0-indexed)
TotalPartitions int // Total number of partitions
}
Note: Fields are ordered by size (interfaces/pointers first) for optimal memory layout.
Breaking Change (v1.2.0): Removed EventsTable and CheckpointsTable fields. Table configuration is now handled by the adapter's store configuration.
DefaultProcessorConfig¶
Returns default configuration.
func DefaultProcessorConfig() ProcessorConfig {
return ProcessorConfig{
BatchSize: 100,
PartitionKey: 0,
TotalPartitions: 1,
PartitionStrategy: HashPartitionStrategy{},
Logger: nil, // No logging by default
}
}
projection.PartitionStrategy¶
Interface for partitioning strategies.
type PartitionStrategy interface {
ShouldProcess(aggregateID string, partitionKey, totalPartitions int) bool
}
projection.HashPartitionStrategy¶
Default hash-based partitioning strategy.
type HashPartitionStrategy struct{}
func (HashPartitionStrategy) ShouldProcess(aggregateID string, partitionKey, totalPartitions int) bool
Uses FNV-1a hash for deterministic, even distribution.
Runner Package¶
runner.Runner¶
Orchestrates multiple projections.
type Runner struct {
// unexported fields
}
New¶
Creates a new runner.
func New(db *sql.DB, eventReader store.EventReader) *Runner
Run¶
Runs multiple projections concurrently.
func (r *Runner) Run(ctx context.Context, configs []ProjectionConfig) error
Parameters:
- ctx: Context for cancellation
- configs: Projection configurations
Returns:
- error: First error from any projection, or ctx.Err()
Example:
store := postgres.NewStore(postgres.DefaultStoreConfig())
runner := runner.New()
// Create processors for each projection
config1 := projection.DefaultProcessorConfig()
processor1 := postgres.NewProcessor(db, store, &config1)
config2 := projection.DefaultProcessorConfig()
processor2 := postgres.NewProcessor(db, store, &config2)
// Run projections
runners := []runner.ProjectionRunner{
{Projection: proj1, Processor: processor1},
{Projection: proj2, Processor: processor2},
}
err := runner.Run(ctx, runners)
runner.ProjectionRunner¶
Pairs a projection with its adapter-specific processor.
type ProjectionRunner struct {
Projection projection.Projection
Processor projection.ProcessorRunner
}
Fields:
- Projection: The projection to run
- Processor: Adapter-specific processor (postgres.Processor, mysql.Processor, etc.)
Example:
store := postgres.NewStore(postgres.DefaultStoreConfig())
config := projection.DefaultProcessorConfig()
config.PartitionKey = 0
config.TotalPartitions = 4
processor := postgres.NewProcessor(db, store, &config)
runner := runner.New()
err := runner.Run(ctx, []runner.ProjectionRunner{
{Projection: &MyProjection{}, Processor: processor},
})
PostgreSQL Adapter¶
postgres.Store¶
PostgreSQL implementation of EventStore, EventReader, AggregateStreamReader, and CheckpointStore.
type Store struct {
// unexported fields
}
NewStore¶
Creates a new PostgreSQL store.
func NewStore(config StoreConfig) *Store
Parameters:
- config: Store configuration
Returns:
- *Store: Store instance
Example:
store := postgres.NewStore(postgres.DefaultStoreConfig())
postgres.StoreConfig¶
Configuration for PostgreSQL store.
type StoreConfig struct {
Logger es.Logger // Optional logger (nil = disabled)
EventsTable string // Events table name
AggregateHeadsTable string // Aggregate heads table name
CheckpointsTable string // Checkpoints table name
}
Note: Fields are ordered by size (interfaces/pointers first) for optimal memory layout.
DefaultStoreConfig¶
Returns default configuration.
func DefaultStoreConfig() StoreConfig {
return StoreConfig{
EventsTable: "events",
AggregateHeadsTable: "aggregate_heads",
CheckpointsTable: "projection_checkpoints",
Logger: nil, // No logging by default
}
}
Error Types¶
store.ErrOptimisticConcurrency¶
Returned when a version conflict is detected.
var ErrOptimisticConcurrency = errors.New("optimistic concurrency conflict")
Example:
_, err := store.Append(ctx, tx, es.Exact(currentVersion), events)
if errors.Is(err, store.ErrOptimisticConcurrency) {
// Retry transaction
}
store.ErrNoEvents¶
Returned when attempting to append zero events.
var ErrNoEvents = errors.New("no events to append")
projection.ErrProjectionStopped¶
Returned when a projection stops due to handler error.
var ErrProjectionStopped = errors.New("projection stopped")
runner.ErrNoProjections¶
Returned when no projections are provided.
var ErrNoProjections = errors.New("no projections provided")
runner.ErrInvalidPartitionConfig¶
Returned when partition configuration is invalid.
var ErrInvalidPartitionConfig = errors.New("invalid partition configuration")
See Also¶
- Getting Started - Setup and basic usage
- Core Concepts - Understanding the architecture
- Scaling Guide - Production patterns
- Examples - Working code examples