Observability

Logging, tracing, and monitoring capabilities for pupsourcing applications.

Table of Contents

Overview

pupsourcing provides comprehensive observability features:

  1. Logging - Optional logger injection without forced dependencies
  2. Distributed Tracing - Built-in TraceID, CorrelationID, and CausationID support
  3. Metrics - Integration patterns with monitoring systems

Logging

Logger Interface

Minimal interface enabling integration with any logging library:

type Logger interface {
    Debug(ctx context.Context, msg string, keyvals ...interface{})
    Info(ctx context.Context, msg string, keyvals ...interface{})
    Error(ctx context.Context, msg string, keyvals ...interface{})
}

Event Store Logging

Logs append operations, read operations, and concurrency conflicts:

import "github.com/getpup/pupsourcing/es/adapters/postgres"

type MyLogger struct {
    logger *slog.Logger
}

func (l *MyLogger) Debug(ctx context.Context, msg string, keyvals ...interface{}) {
    l.logger.DebugContext(ctx, msg, keyvals...)
}

func (l *MyLogger) Info(ctx context.Context, msg string, keyvals ...interface{}) {
    l.logger.InfoContext(ctx, msg, keyvals...)
}

func (l *MyLogger) Error(ctx context.Context, msg string, keyvals ...interface{}) {
    l.logger.ErrorContext(ctx, msg, keyvals...)
}

// Inject logger
config := postgres.DefaultStoreConfig()
config.Logger = &MyLogger{logger: slog.Default()}
store := postgres.NewStore(config)

Projection Logging

Logs processor lifecycle, batch progress, checkpoints, and errors:

import "github.com/getpup/pupsourcing/es/projection"

config := projection.DefaultProcessorConfig()
config.Logger = &MyLogger{logger: slog.Default()}
processor := projection.NewProcessor(db, store, &config)

Zero-Overhead Design

Logging disabled by default with no performance impact:

// No logger configured = zero overhead
config := postgres.DefaultStoreConfig()  // Logger is nil
store := postgres.NewStore(config)

All logging operations check logger != nil before execution, ensuring zero allocation or call overhead when disabled.

Integration Examples

Standard Library log

import "log"

type StdLogger struct{}

func (l *StdLogger) Debug(ctx context.Context, msg string, keyvals ...interface{}) {
    log.Printf("[DEBUG] %s %v", msg, keyvals)
}

func (l *StdLogger) Info(ctx context.Context, msg string, keyvals ...interface{}) {
    log.Printf("[INFO] %s %v", msg, keyvals)
}

func (l *StdLogger) Error(ctx context.Context, msg string, keyvals ...interface{}) {
    log.Printf("[ERROR] %s %v", msg, keyvals)
}

slog (Go 1.21+)

import "log/slog"

type SlogLogger struct {
    logger *slog.Logger
}

func (l *SlogLogger) Debug(ctx context.Context, msg string, keyvals ...interface{}) {
    l.logger.DebugContext(ctx, msg, keyvals...)
}

func (l *SlogLogger) Info(ctx context.Context, msg string, keyvals ...interface{}) {
    l.logger.InfoContext(ctx, msg, keyvals...)
}

func (l *SlogLogger) Error(ctx context.Context, msg string, keyvals ...interface{}) {
    l.logger.ErrorContext(ctx, msg, keyvals...)
}

zap

import "go.uber.org/zap"

type ZapLogger struct {
    logger *zap.SugaredLogger
}

func (l *ZapLogger) Debug(ctx context.Context, msg string, keyvals ...interface{}) {
    l.logger.Debugw(msg, keyvals...)
}

func (l *ZapLogger) Info(ctx context.Context, msg string, keyvals ...interface{}) {
    l.logger.Infow(msg, keyvals...)
}

func (l *ZapLogger) Error(ctx context.Context, msg string, keyvals ...interface{}) {
    l.logger.Errorw(msg, keyvals...)
}

zerolog

import "github.com/rs/zerolog"

type ZerologLogger struct {
    logger zerolog.Logger
}

func (l *ZerologLogger) Debug(ctx context.Context, msg string, keyvals ...interface{}) {
    l.logger.Debug().Fields(keyvals).Msg(msg)
}

func (l *ZerologLogger) Info(ctx context.Context, msg string, keyvals ...interface{}) {
    l.logger.Info().Fields(keyvals).Msg(msg)
}

func (l *ZerologLogger) Error(ctx context.Context, msg string, keyvals ...interface{}) {
    l.logger.Error().Fields(keyvals).Msg(msg)
}

logrus

import "github.com/sirupsen/logrus"

type LogrusLogger struct {
    logger *logrus.Logger
}

func (l *LogrusLogger) Debug(ctx context.Context, msg string, keyvals ...interface{}) {
    l.logger.WithFields(toLogrusFields(keyvals)).Debug(msg)
}

func (l *LogrusLogger) Info(ctx context.Context, msg string, keyvals ...interface{}) {
    l.logger.WithFields(toLogrusFields(keyvals)).Info(msg)
}

func (l *LogrusLogger) Error(ctx context.Context, msg string, keyvals ...interface{}) {
    l.logger.WithFields(toLogrusFields(keyvals)).Error(msg)
}

func toLogrusFields(keyvals []interface{}) logrus.Fields {
    fields := make(logrus.Fields)
    for i := 0; i < len(keyvals); i += 2 {
        if i+1 < len(keyvals) {
            fields[fmt.Sprint(keyvals[i])] = keyvals[i+1]
        }
    }
    return fields
}

See the with-logging example for a complete working demonstration.

Distributed Tracing

pupsourcing includes built-in support for distributed tracing through three optional string fields in every event:

  • TraceID - Links all events in a distributed operation (e.g., a user request across multiple services)
  • CorrelationID - Links related events across aggregates within the same business transaction
  • CausationID - Identifies the event or command that caused this event

These fields accept any string format (UUID, ULID, or custom IDs) for maximum flexibility.

Using Trace IDs

Extract the trace ID from your request context and propagate it to events:

import (
    "go.opentelemetry.io/otel/trace"
    "github.com/google/uuid"
)

func HandleRequest(ctx context.Context, store *postgres.Store) error {
    // Extract OpenTelemetry trace ID from context
    span := trace.SpanFromContext(ctx)
    traceID := span.SpanContext().TraceID()

    // Convert to string - OpenTelemetry trace IDs are 128-bit hex strings
    traceIDStr := traceID.String()

    // Create event with trace ID
    event := es.Event{
        AggregateType: "Order",
        AggregateID:   orderID,
        EventID:       uuid.New(),
        EventType:     "OrderCreated",
        EventVersion:  1,
        Payload:       payload,
        Metadata:      []byte(`{}`),
        CreatedAt:     time.Now(),
        TraceID:       es.NullString{String: traceIDStr, Valid: true},
    }

    tx, _ := db.BeginTx(ctx, nil)
    defer tx.Rollback()

    _, err := store.Append(ctx, tx, es.Any(), []es.Event{event})
    if err != nil {
        return err
    }

    return tx.Commit()
}

Propagating Trace Context in Projections

When processing events in projections, propagate the trace ID to maintain observability:

type TracedProjection struct {
    tracer trace.Tracer
}

func (p *TracedProjection) Handle(ctx context.Context, event es.PersistedEvent) error {
    // Extract trace ID from event if present
    if event.TraceID.Valid {
        // Parse the trace ID string (assuming it's in hex format)
        traceID, err := trace.TraceIDFromHex(event.TraceID.String)
        if err == nil {
            // Create new span with the trace ID
            spanCtx := trace.NewSpanContext(trace.SpanContextConfig{
                TraceID:    traceID,
                TraceFlags: trace.FlagsSampled,
            })
            ctx = trace.ContextWithSpanContext(ctx, spanCtx)
        }
    }

    // Start a new span for projection processing
    ctx, span := p.tracer.Start(ctx, "projection.handle",
        trace.WithAttributes(
            attribute.String("event.type", event.EventType),
            attribute.String("aggregate.type", event.AggregateType),
            attribute.String("aggregate.id", event.AggregateID),
        ),
    )
    defer span.End()

    // Process event with trace context
    // ...

    return nil
}

Correlation and Causation

Use CorrelationID and CausationID to track event relationships:

// Generate a correlation ID for the business transaction
correlationID := uuid.New().String()

// Original command creates first event
originalEvent := es.Event{
    EventID:       uuid.New(),
    AggregateID:   orderID,
    EventType:     "OrderCreated",
    CorrelationID: es.NullString{String: correlationID, Valid: true},
    // ... other fields
}

// Subsequent event caused by the first
followUpEvent := es.Event{
    EventID:       uuid.New(),
    AggregateID:   inventoryID,
    EventType:     "InventoryReserved",
    CorrelationID: es.NullString{String: correlationID, Valid: true},
    CausationID:   es.NullString{String: originalEvent.EventID.String(), Valid: true},
    // ... other fields
}

This creates a clear chain of causality: - CorrelationID links all events in the same business transaction - CausationID shows which event triggered this one

OpenTelemetry Integration Example

If you'd like to add distributed tracing spans to your event store operations, you can create a wrapper around the store that instruments the Append and read methods with OpenTelemetry. This allows you to:

  • Track the performance of event append operations
  • See which aggregates are being written to
  • Correlate event store operations with other parts of your distributed system
  • Identify bottlenecks in event processing

Here's how to create a tracing wrapper:

import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/trace"
)

// TracingEventStore wraps a postgres.Store to add OpenTelemetry spans
type TracingEventStore struct {
    store  *postgres.Store
    tracer trace.Tracer
}

func NewTracingEventStore(store *postgres.Store) *TracingEventStore {
    return &TracingEventStore{
        store:  store,
        tracer: otel.Tracer("pupsourcing"),
    }
}

// Append wraps the store's Append method with a span
func (s *TracingEventStore) Append(ctx context.Context, tx es.DBTX, expectedVersion es.ExpectedVersion, events []es.Event) (es.AppendResult, error) {
    // Start a new span for this append operation
    ctx, span := s.tracer.Start(ctx, "eventstore.append",
        trace.WithAttributes(
            attribute.Int("event.count", len(events)),
            attribute.String("aggregate.type", events[0].AggregateType),
            attribute.String("aggregate.id", events[0].AggregateID),
        ),
    )
    defer span.End()

    // Call the underlying store
    result, err := s.store.Append(ctx, tx, expectedVersion, events)
    if err != nil {
        span.RecordError(err)
        return es.AppendResult{}, err
    }

    // Add the resulting positions as span attributes
    span.SetAttributes(attribute.Int64Slice("positions", result.GlobalPositions))
    return result, nil
}

You can apply the same pattern to wrap ReadEvents and ReadAggregateStream methods, creating spans for read operations to track query performance and access patterns.

Metrics

For metrics integration with Prometheus and other monitoring systems, see the Deployment Guide's Monitoring section.

Key metrics to track: - Event append rate - Event append latency - Projection lag (events behind) - Projection processing rate - Projection errors

Best Practices

Logging

  1. Use appropriate log levels
  2. Debug: Detailed diagnostic information
  3. Info: Significant operational events
  4. Error: Error conditions that require attention

  5. Include context

  6. Always pass the context to logging methods
  7. Include relevant key-value pairs (aggregate IDs, event types, etc.)

  8. Avoid PII in logs

  9. Don't log sensitive user data
  10. Consider redacting event payloads

Tracing

  1. Always propagate trace context
  2. Extract trace IDs from incoming requests
  3. Include trace IDs in all events
  4. Propagate to downstream services

  5. Use correlation IDs for business transactions

  6. Generate at the start of a business transaction
  7. Include in all related events across aggregates

  8. Track causation chains

  9. Set CausationID when one event triggers another
  10. Helps debug complex event chains

Metrics

  1. Monitor projection lag
  2. Alert when projections fall too far behind
  3. Critical for user-facing read models

  4. Track error rates

  5. Monitor projection failures
  6. Alert on sustained error conditions

  7. Measure latencies

  8. P50, P95, P99 for event appends
  9. Projection processing time per event

Troubleshooting

High Projection Lag

Check: 1. Projection processing performance (slow queries?) 2. Batch size configuration 3. Need for more workers (horizontal scaling) 4. Database connection pool size

Optimistic Concurrency Conflicts

The logger will show these as ERROR level with aggregate details. Common causes: 1. Multiple services writing to same aggregate 2. Retry logic without backoff 3. Race conditions in application code

Missing Events in Projections

Use logging to verify: 1. Events are being appended (check store logs) 2. Projection is processing (check processor logs) 3. Partition key is correct (for partitioned projections)