Getting Started¶
This guide covers installation, setup, and creating your first event-sourced application with pupsourcing.
Table of Contents¶
Prerequisites¶
- Go 1.23 or later
- PostgreSQL 12+ (or SQLite for development/testing)
Installation¶
go get github.com/getpup/pupsourcing
Choose a database driver:
# PostgreSQL (recommended for production)
go get github.com/lib/pq
# SQLite (ideal for development/testing)
go get modernc.org/sqlite
# MySQL/MariaDB
go get github.com/go-sql-driver/mysql
Quick Start¶
1. Generate Database Schema¶
Generate SQL migrations for your chosen database:
go run github.com/getpup/pupsourcing/cmd/migrate-gen -output migrations
Or use go generate:
//go:generate go run github.com/getpup/pupsourcing/cmd/migrate-gen -output migrations
This creates SQL migration files with: - Events table with proper indexes - Aggregate heads table for version tracking - Projection checkpoints table
2. Apply Schema Migrations¶
Apply the generated migrations using your preferred migration tool (golang-migrate, goose, etc.).
3. Initialize Database Connection¶
import (
"database/sql"
_ "github.com/lib/pq"
)
db, err := sql.Open("postgres",
"host=localhost port=5432 user=postgres password=postgres dbname=myapp sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
4. Create Event Store¶
import (
"github.com/getpup/pupsourcing/es/adapters/postgres"
)
store := postgres.NewStore(postgres.DefaultStoreConfig())
5. Append Your First Event¶
import (
"github.com/getpup/pupsourcing/es"
"github.com/google/uuid"
"time"
)
// Define your event payload
type UserCreated struct {
Email string `json:"email"`
Name string `json:"name"`
}
// Marshal to JSON
payload, _ := json.Marshal(UserCreated{
Email: "alice@example.com",
Name: "Alice Smith",
})
// Create event
aggregateID := uuid.New().String() // In practice, this comes from your domain/business logic
events := []es.Event{
{
BoundedContext: "Identity", // Required: scope events to bounded context
AggregateType: "User",
AggregateID: aggregateID,
EventID: uuid.New(),
EventType: "UserCreated",
EventVersion: 1,
Payload: payload,
Metadata: []byte(`{}`),
CreatedAt: time.Now(),
},
}
// Append in a transaction
ctx := context.Background()
tx, _ := db.BeginTx(ctx, nil)
defer tx.Rollback()
result, err := store.Append(ctx, tx, es.NoStream(), events)
if err != nil {
log.Fatal(err)
}
if err := tx.Commit(); err != nil {
log.Fatal(err)
}
fmt.Printf("Event appended at position: %d\n", result.GlobalPositions[0])
fmt.Printf("Aggregate version: %d\n", result.ToVersion())
6. Read Events¶
// Read all events for an aggregate
aggregateID := "550e8400-e29b-41d4-a716-446655440000" // Use the actual aggregate ID
tx, _ := db.BeginTx(ctx, nil)
defer tx.Rollback()
stream, err := store.ReadAggregateStream(ctx, tx, "Identity", "User", aggregateID, nil, nil)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Aggregate version: %d\n", stream.Version())
for _, event := range stream.Events {
fmt.Printf("Event: %s (version %d)\n", event.EventType, event.AggregateVersion)
}
7. Create a Projection¶
Create a scoped projection that only receives User events:
import (
"github.com/getpup/pupsourcing/es/projection"
)
type UserCountProjection struct {
count int
}
func (p *UserCountProjection) Name() string {
return "user_count"
}
// AggregateTypes makes this a scoped projection
func (p *UserCountProjection) AggregateTypes() []string {
return []string{"User"} // Only receives User events
}
// BoundedContexts filters by context - receives only Identity context events
func (p *UserCountProjection) BoundedContexts() []string {
return []string{"Identity"}
}
func (p *UserCountProjection) Handle(_ context.Context, event es.PersistedEvent) error {
if event.EventType == "UserCreated" {
p.count++
fmt.Printf("User count: %d\n", p.count)
}
return nil
}
8. Run the Projection¶
proj := &UserCountProjection{}
config := projection.DefaultProcessorConfig()
// Use adapter-specific processor
store := postgres.NewStore(postgres.DefaultStoreConfig())
processor := postgres.NewProcessor(db, store, &config)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Run until context is cancelled
err := processor.Run(ctx, proj)
Complete Example¶
See the complete working example that ties everything together.
Next Steps¶
- Learn Core Concepts to understand event sourcing with pupsourcing
- Explore Projections & Scaling to build read models
- See Scaling Guide for production deployments
- Browse Examples for more patterns
Common Patterns¶
Appending Multiple Events¶
userID := uuid.New().String()
events := []es.Event{
{
BoundedContext: "Identity",
AggregateType: "User",
AggregateID: userID,
EventID: uuid.New(),
EventType: "UserCreated",
EventVersion: 1,
Payload: payload1,
Metadata: []byte(`{}`),
CreatedAt: time.Now(),
},
{
BoundedContext: "Identity",
AggregateType: "User",
AggregateID: userID, // Same aggregate
EventID: uuid.New(),
EventType: "EmailVerified",
EventVersion: 1,
Payload: payload2,
Metadata: []byte(`{}`),
CreatedAt: time.Now(),
},
}
// Both events appended atomically
result, err := store.Append(ctx, tx, es.NoStream(), events)
Handling Version Conflicts¶
result, err := store.Append(ctx, tx, es.Exact(currentVersion), events)
if errors.Is(err, store.ErrOptimisticConcurrency) {
// Another transaction modified this aggregate
// Retry the entire operation
tx.Rollback()
// ... retry logic
}
Reading Event Ranges¶
aggregateID := uuid.New().String()
// Read from version 5 onwards (e.g., after loading a snapshot)
fromVersion := int64(5)
stream, err := store.ReadAggregateStream(ctx, tx, "Identity", "User", aggregateID, &fromVersion, nil)
// Read a specific range
toVersion := int64(10)
stream, err := store.ReadAggregateStream(ctx, tx, "Identity", "User", aggregateID, &fromVersion, &toVersion)
Troubleshooting¶
Connection Errors¶
Ensure PostgreSQL is running:
docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=postgres postgres:16
Migration Issues¶
Verify migrations were applied:
\d events
\d aggregate_heads
\d projection_checkpoints
Event Not Appearing¶
Check transaction was committed:
tx, _ := db.BeginTx(ctx, nil)
store.Append(ctx, tx, es.NoStream(), events)
tx.Commit() // Don't forget this!
Projection Not Processing¶
Verify events exist:
SELECT COUNT(*) FROM events;
Check projection checkpoint:
SELECT * FROM projection_checkpoints WHERE projection_name = 'your_projection';
Resources¶
- Core Concepts - Understand the fundamentals
- API Reference - Complete API documentation
- Examples - Working code examples