Observability¶
Overview¶
Observability in an event-sourced system spans three pillars:
- Logging — Structured output from the event store and worker for diagnostics.
- Distributed Tracing — Trace and correlation IDs that follow events from command to consumer.
- Metrics — Quantitative signals for monitoring health, performance, and lag.
Pupsourcing provides built-in support for logging and trace propagation. Metrics are your responsibility to collect, but the system exposes the data points you need.
Logging¶
The Logger Interface¶
Both the store and worker packages accept an optional logger through the store.Logger interface:
type Logger interface {
Debug(ctx context.Context, msg string, keyvals ...interface{})
Info(ctx context.Context, msg string, keyvals ...interface{})
Error(ctx context.Context, msg string, keyvals ...interface{})
}
The interface uses structured key-value pairs for context. If no logger is configured, logging is a no-op (nil-safe) — zero overhead in production if you don't need it.
Configuring the Logger¶
On the event store:
import "github.com/pupsourcing/store/storepostgres"
eventStore := storepostgres.New(db, storepostgres.WithLogger(myLogger))
On the worker:
import "github.com/pupsourcing/worker"
w := worker.New(db, worker.WithLogger(myLogger))
Integration Examples¶
Using log/slog (recommended)¶
Go's built-in structured logger from the standard library (Go 1.21+):
import (
"context"
"log/slog"
"os"
"github.com/pupsourcing/store"
)
type SlogAdapter struct {
logger *slog.Logger
}
func NewSlogAdapter() *SlogAdapter {
return &SlogAdapter{
logger: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
})),
}
}
func (a *SlogAdapter) Debug(ctx context.Context, msg string, keyvals ...interface{}) {
a.logger.DebugContext(ctx, msg, keyvals...)
}
func (a *SlogAdapter) Info(ctx context.Context, msg string, keyvals ...interface{}) {
a.logger.InfoContext(ctx, msg, keyvals...)
}
func (a *SlogAdapter) Error(ctx context.Context, msg string, keyvals ...interface{}) {
a.logger.ErrorContext(ctx, msg, keyvals...)
}
Using log (standard library)¶
Minimal adapter for the standard library logger:
import (
"context"
"log"
"github.com/pupsourcing/store"
)
type StdLogAdapter struct{}
func (a *StdLogAdapter) Debug(ctx context.Context, msg string, keyvals ...interface{}) {
log.Printf("[DEBUG] %s %v", msg, keyvals)
}
func (a *StdLogAdapter) Info(ctx context.Context, msg string, keyvals ...interface{}) {
log.Printf("[INFO] %s %v", msg, keyvals)
}
func (a *StdLogAdapter) Error(ctx context.Context, msg string, keyvals ...interface{}) {
log.Printf("[ERROR] %s %v", msg, keyvals)
}
Using zerolog¶
import (
"context"
"github.com/rs/zerolog"
)
type ZerologAdapter struct {
logger zerolog.Logger
}
func (a *ZerologAdapter) Debug(ctx context.Context, msg string, keyvals ...interface{}) {
a.logger.Debug().Fields(keyvals).Msg(msg)
}
func (a *ZerologAdapter) Info(ctx context.Context, msg string, keyvals ...interface{}) {
a.logger.Info().Fields(keyvals).Msg(msg)
}
func (a *ZerologAdapter) Error(ctx context.Context, msg string, keyvals ...interface{}) {
a.logger.Error().Fields(keyvals).Msg(msg)
}
Distributed Tracing¶
Trace Fields on Events¶
Every event in pupsourcing carries three optional tracing fields:
| Field | Purpose |
|---|---|
TraceID |
OpenTelemetry trace ID for distributed tracing |
CorrelationID |
Business-level correlation across related events |
CausationID |
ID of the event or command that caused this event |
These fields are stored as store.NullString — nullable strings that are NULL in the database when not set.
Setting Trace Fields¶
Use the generated event mapping options to set trace fields when creating store events:
import "myapp/infrastructure/persistence"
storeEvents, err := persistence.ToESEvents("User", userID, domainEvents,
persistence.WithTraceID(span.SpanContext().TraceID().String()),
persistence.WithCorrelationID(correlationID),
persistence.WithCausationID(commandID),
)
Propagating Trace Context in Consumers¶
When a consumer processes an event, extract the trace fields and propagate them to any downstream operations:
func (p *Projection) Handle(ctx context.Context, pe store.PersistedEvent) error {
// Extract trace context from the incoming event
if pe.TraceID.Valid {
ctx = withTraceID(ctx, pe.TraceID.String)
}
if pe.CorrelationID.Valid {
ctx = withCorrelationID(ctx, pe.CorrelationID.String)
}
// Process the event with trace context propagated
domainEvent, err := persistence.FromESEvent(pe)
if err != nil {
return err
}
switch e := domainEvent.(type) {
case v1.UserRegistered:
return p.onUserRegistered(ctx, pe, e)
}
return nil
}
Correlation Patterns¶
Command → Events: Set the CorrelationID on all events produced by a command to the same value. This lets you trace all events back to the originating command.
// In a command handler
storeEvents, _ := persistence.ToESEvents("Order", orderID, domainEvents,
persistence.WithCorrelationID(cmd.CommandID),
persistence.WithCausationID(cmd.CommandID),
)
Event → Events (reactions): When a consumer produces new events in response to an event, set the CausationID to the triggering event's ID and carry forward the CorrelationID.
// In a reactive consumer
func (h *ReactionHandler) Handle(ctx context.Context, pe store.PersistedEvent) error {
newEvents, _ := persistence.ToESEvents("Notification", notifID, domainEvents,
persistence.WithCorrelationID(pe.CorrelationID.String),
persistence.WithCausationID(pe.EventID.String()),
)
// ...
}
Metrics¶
What to Monitor¶
Pupsourcing does not ship a metrics library, but it exposes the data points you need. Collect these metrics using your preferred observability stack (Prometheus, OpenTelemetry, Datadog, etc.).
Consumer Lag¶
The single most important metric. Consumer lag is the difference between the latest global position in the event store and the consumer's checkpoint position.
// Query latest global position
var latestPosition int64
err := db.QueryRowContext(ctx,
`SELECT COALESCE(MAX(global_position), 0) FROM events`,
).Scan(&latestPosition)
// Query consumer checkpoint
var checkpointPosition int64
err = db.QueryRowContext(ctx,
`SELECT COALESCE(position, 0) FROM consumer_checkpoints WHERE consumer_id = $1`,
consumerID,
).Scan(&checkpointPosition)
lag := latestPosition - checkpointPosition
// Expose as a Prometheus gauge
consumerLag.WithLabelValues(consumerID).Set(float64(lag))
Event Append Rate¶
Track the rate of events being appended to the store. A sudden drop may indicate application issues; a spike may signal unusual activity.
// Instrument your event store wrapper
func (s *InstrumentedStore) AppendEvents(
ctx context.Context,
aggType, aggID string,
events []store.Event,
cond store.AppendCondition,
) ([]store.PersistedEvent, error) {
result, err := s.inner.AppendEvents(ctx, aggType, aggID, events, cond)
if err == nil {
eventsAppended.WithLabelValues(aggType).Add(float64(len(events)))
}
return result, err
}
Consumer Processing Rate¶
Track how many events each consumer processes per second and how long each batch takes:
func (p *InstrumentedProjection) Handle(ctx context.Context, pe store.PersistedEvent) error {
start := time.Now()
err := p.inner.Handle(ctx, pe)
duration := time.Since(start)
eventProcessingDuration.WithLabelValues(p.name).Observe(duration.Seconds())
if err != nil {
eventProcessingErrors.WithLabelValues(p.name).Inc()
} else {
eventsProcessed.WithLabelValues(p.name).Inc()
}
return err
}
Worker Heartbeat¶
The worker uses heartbeats to claim ownership of consumer instances. Monitor heartbeat freshness to detect worker failures:
// Query stale heartbeats
rows, err := db.QueryContext(ctx, `
SELECT consumer_id, last_heartbeat
FROM consumer_instances
WHERE last_heartbeat < NOW() - INTERVAL '30 seconds'
`)
Example Prometheus Metrics¶
import "github.com/prometheus/client_golang/prometheus"
var (
consumerLag = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "pupsourcing_consumer_lag",
Help: "Number of events behind the latest global position",
}, []string{"consumer_id"})
eventsAppended = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pupsourcing_events_appended_total",
Help: "Total events appended to the store",
}, []string{"aggregate_type"})
eventsProcessed = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pupsourcing_events_processed_total",
Help: "Total events processed by consumers",
}, []string{"consumer"})
eventProcessingDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "pupsourcing_event_processing_duration_seconds",
Help: "Time to process a single event",
Buckets: prometheus.DefBuckets,
}, []string{"consumer"})
eventProcessingErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pupsourcing_event_processing_errors_total",
Help: "Total event processing errors",
}, []string{"consumer"})
)
func init() {
prometheus.MustRegister(
consumerLag,
eventsAppended,
eventsProcessed,
eventProcessingDuration,
eventProcessingErrors,
)
}
Best Practices¶
-
Use structured logging.
log/slogis recommended — it integrates naturally with thestore.Loggerinterface and produces machine-parseable output. -
Propagate trace context consistently. Set
TraceID,CorrelationID, andCausationIDon every event. This makes debugging event chains across services straightforward. -
Monitor consumer lag as your primary health indicator. Rising lag means consumers are falling behind. Alert when lag exceeds a threshold (e.g., 1000 events or 5 minutes of wall time).
-
Alert on consecutive processing failures. A single failure may be transient, but repeated failures on the same event indicate a bug or a poison message.
-
Don't log PII. Events may contain sensitive data. Ensure your logger does not inadvertently output personally identifiable information. Log event metadata (type, ID, position) but not payloads.
-
Use labeled metrics. Tag metrics with
consumer_id,aggregate_type, or other dimensions to isolate issues to specific consumers or aggregates. -
Correlate logs with traces. Include
TraceIDandCorrelationIDin log entries so you can jump from a log line to the full trace in your observability platform.