Getting Started¶
This guide walks you through building your first event-sourced application with pupsourcing — from installation to running a consumer that processes events in real time.
Prerequisites¶
Before you begin, make sure you have:
- Go 1.24 or later
- PostgreSQL 12 or later (running and accessible)
- Basic familiarity with Go modules and
database/sql
Note
Pupsourcing uses database/sql transactions for all store operations. Every append and read goes through a *sql.Tx — this ensures your domain events and any side effects (like projection updates) are committed atomically.
Installation¶
Pupsourcing is split into two packages: store (the event store) and worker (distributed consumer processing). Install both:
go get github.com/pupsourcing/store
go get github.com/pupsourcing/worker
You will also need a PostgreSQL driver:
go get github.com/lib/pq
Database Setup¶
Pupsourcing provides migration generators that output plain SQL files. You run a small Go program to generate them, then apply the SQL with your preferred migration tool.
Generate Store Migrations¶
The store needs two tables: one for events and one for aggregate version tracking.
package main
import (
"log"
"github.com/pupsourcing/store/migrations"
)
func main() {
config := &migrations.Config{
OutputFolder: "./db/migrations",
OutputFilename: "001_event_store.sql",
EventsTable: "events",
AggregateHeadsTable: "aggregate_heads",
}
if err := migrations.GeneratePostgres(config); err != nil {
log.Fatal(err)
}
}
Generate Worker Migrations¶
The worker needs tables for node coordination, consumer assignment, and checkpoint tracking.
package main
import (
"log"
workermigrations "github.com/pupsourcing/worker/migrations"
)
func main() {
config := &workermigrations.Config{
OutputFolder: "./db/migrations",
OutputFilename: "002_worker.sql",
WorkerNodesTable: "worker_nodes",
ConsumerAssignmentsTable: "consumer_assignments",
ConsumerCheckpointsTable: "consumer_checkpoints",
}
if err := workermigrations.GeneratePostgres(config); err != nil {
log.Fatal(err)
}
}
Apply Migrations¶
Once generated, apply the SQL files using psql or any migration tool you prefer (goose, golang-migrate, etc.):
psql -d myapp -f db/migrations/001_event_store.sql
psql -d myapp -f db/migrations/002_worker.sql
Tip
You can combine both migration generators into a single Go program and run them with go run. Keep them as a tool in your project so every developer can regenerate the schema.
Creating the Event Store¶
Connect to PostgreSQL and create the store instance:
package main
import (
"database/sql"
"log"
_ "github.com/lib/pq"
storepostgres "github.com/pupsourcing/store/postgres"
)
func main() {
db, err := sql.Open("postgres", "postgres://localhost:5432/myapp?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
eventStore := storepostgres.NewStore(storepostgres.DefaultStoreConfig())
// eventStore is ready to use
_ = eventStore
}
DefaultStoreConfig() uses sensible defaults for table names and settings. You can customize it with options like WithEventsTable, WithAggregateHeadsTable, WithNotifyChannel, or WithLogger if needed.
Appending Your First Event¶
Let's model a user registration. Define a domain event struct, marshal it to JSON, and append it to the store.
package main
import (
"context"
"database/sql"
"encoding/json"
"log"
"time"
"github.com/google/uuid"
_ "github.com/lib/pq"
"github.com/pupsourcing/store"
storepostgres "github.com/pupsourcing/store/postgres"
)
type UserRegistered struct {
Email string `json:"email"`
Name string `json:"name"`
}
func main() {
db, err := sql.Open("postgres", "postgres://localhost:5432/myapp?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
eventStore := storepostgres.NewStore(storepostgres.DefaultStoreConfig())
ctx := context.Background()
userID := uuid.New().String()
payload, err := json.Marshal(UserRegistered{
Email: "alice@example.com",
Name: "Alice",
})
if err != nil {
log.Fatal(err)
}
tx, err := db.BeginTx(ctx, nil)
if err != nil {
log.Fatal(err)
}
defer tx.Rollback()
result, err := eventStore.Append(ctx, tx, store.NoStream(), []store.Event{{
AggregateType: "User",
AggregateID: userID,
EventID: uuid.New(),
EventType: "UserRegistered",
EventVersion: 1,
Payload: payload,
Metadata: []byte("{}"),
CreatedAt: time.Now(),
}})
if err != nil {
log.Fatal(err)
}
if err := tx.Commit(); err != nil {
log.Fatal(err)
}
log.Printf("Event appended — global position: %d", result.GlobalPositions[0])
}
Warning
Always call tx.Commit() after a successful append. Without it, the transaction rolls back and your events are lost. The defer tx.Rollback() is a safety net — it's a no-op if the transaction was already committed.
Why NoStream()?¶
The second argument to Append is an expected version. It controls optimistic concurrency:
store.NoStream()— asserts the aggregate stream does not yet exist. Use this when creating a new aggregate.store.Exact(version)— asserts the stream is at exactly this version. Use this when updating an existing aggregate.store.Any()— skips the version check entirely. Use with caution.
Reading Events Back¶
Use ReadAggregateStream to load all events for an aggregate:
tx, err := db.BeginTx(ctx, nil)
if err != nil {
log.Fatal(err)
}
defer tx.Rollback()
stream, err := eventStore.ReadAggregateStream(ctx, tx, "User", userID, nil, nil)
if err != nil {
log.Fatal(err)
}
log.Printf("Stream version: %d (%d events)", stream.Version(), stream.Len())
for _, event := range stream.Events {
log.Printf(" [v%d] %s — %s", event.AggregateVersion, event.EventType, string(event.Payload))
}
The Stream type provides helper methods:
Version()returns the latest aggregate versionLen()returns the number of eventsIsEmpty()returns true if no events exist
Note
The last two nil arguments to ReadAggregateStream are optional fromVersion and toVersion pointers, which let you read a specific range of the stream — useful when combined with snapshots.
Updating an Aggregate¶
To append more events to an existing aggregate, read the current version first and use Exact(version) for optimistic concurrency:
type UserEmailChanged struct {
OldEmail string `json:"old_email"`
NewEmail string `json:"new_email"`
}
// Read current state
tx, err := db.BeginTx(ctx, nil)
if err != nil {
log.Fatal(err)
}
defer tx.Rollback()
stream, err := eventStore.ReadAggregateStream(ctx, tx, "User", userID, nil, nil)
if err != nil {
log.Fatal(err)
}
currentVersion := stream.Version()
// Append with version check
payload, err := json.Marshal(UserEmailChanged{
OldEmail: "alice@example.com",
NewEmail: "alice@newdomain.com",
})
if err != nil {
log.Fatal(err)
}
_, err = eventStore.Append(ctx, tx, store.Exact(currentVersion), []store.Event{{
AggregateType: "User",
AggregateID: userID,
EventID: uuid.New(),
EventType: "UserEmailChanged",
EventVersion: 1,
Payload: payload,
Metadata: []byte("{}"),
CreatedAt: time.Now(),
}})
if err != nil {
log.Fatal(err)
}
if err := tx.Commit(); err != nil {
log.Fatal(err)
}
If another transaction appended events between your read and write, Append returns store.ErrOptimisticConcurrency. This is the foundation of conflict detection in event sourcing — you never silently overwrite someone else's changes.
Building a Consumer¶
A consumer processes events to build read models, send notifications, or trigger side effects. Implement the Consumer interface:
package main
import (
"context"
"database/sql"
"log"
"github.com/pupsourcing/store"
)
type UserCountProjection struct{}
func (p *UserCountProjection) Name() string {
return "user_count"
}
func (p *UserCountProjection) Handle(ctx context.Context, tx *sql.Tx, event store.PersistedEvent) error {
if event.EventType != "UserRegistered" {
return nil
}
_, err := tx.ExecContext(ctx,
`INSERT INTO user_stats (metric, value) VALUES ('total_users', 1)
ON CONFLICT (metric) DO UPDATE SET value = user_stats.value + 1`)
if err != nil {
return err
}
log.Printf("Processed %s — updated user count", event.EventType)
return nil
}
The worker calls Handle inside a transaction that also updates the consumer's checkpoint. If Handle returns an error, the entire batch is rolled back and retried — your projection stays consistent with the event stream.
Tip
Create the user_stats table before running the consumer:
sql
CREATE TABLE IF NOT EXISTS user_stats (
metric TEXT PRIMARY KEY,
value INTEGER NOT NULL DEFAULT 0
);
Running the Worker¶
The worker coordinates consumer processing across one or more nodes. Wire everything together in a main() function:
package main
import (
"context"
"database/sql"
"log"
"os"
"os/signal"
"time"
_ "github.com/lib/pq"
"github.com/pupsourcing/store"
"github.com/pupsourcing/store/consumer"
storepostgres "github.com/pupsourcing/store/postgres"
"github.com/pupsourcing/worker"
)
type UserCountProjection struct{}
func (p *UserCountProjection) Name() string {
return "user_count"
}
func (p *UserCountProjection) Handle(ctx context.Context, tx *sql.Tx, event store.PersistedEvent) error {
if event.EventType != "UserRegistered" {
return nil
}
_, err := tx.ExecContext(ctx,
`INSERT INTO user_stats (metric, value) VALUES ('total_users', 1)
ON CONFLICT (metric) DO UPDATE SET value = user_stats.value + 1`)
return err
}
func main() {
db, err := sql.Open("postgres", "postgres://localhost:5432/myapp?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
eventStore := storepostgres.NewStore(storepostgres.DefaultStoreConfig())
consumers := []consumer.Consumer{
&UserCountProjection{},
}
w := worker.New(db, eventStore, consumers,
worker.WithBatchSize(100),
worker.WithPollInterval(500*time.Millisecond),
)
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
log.Println("Starting worker...")
if err := w.Start(ctx); err != nil {
log.Fatal(err)
}
}
Start blocks until the context is cancelled (e.g., by Ctrl+C). The worker polls for new events, dispatches them to consumers in batches, and tracks progress via checkpoints. You can run multiple worker instances against the same database for horizontal scaling.
Common Patterns¶
Appending Multiple Events in One Call¶
You can append several events to the same aggregate atomically:
events := []store.Event{
{
AggregateType: "User",
AggregateID: userID,
EventID: uuid.New(),
EventType: "UserRegistered",
EventVersion: 1,
Payload: registeredPayload,
Metadata: []byte("{}"),
CreatedAt: time.Now(),
},
{
AggregateType: "User",
AggregateID: userID,
EventID: uuid.New(),
EventType: "WelcomeEmailScheduled",
EventVersion: 1,
Payload: emailPayload,
Metadata: []byte("{}"),
CreatedAt: time.Now(),
},
}
result, err := eventStore.Append(ctx, tx, store.NoStream(), events)
All events in a single Append call are written atomically — either all succeed or none do.
Handling Version Conflicts¶
When using Exact(version), be prepared for concurrency conflicts:
import "errors"
result, err := eventStore.Append(ctx, tx, store.Exact(currentVersion), events)
if errors.Is(err, store.ErrOptimisticConcurrency) {
tx.Rollback()
// Re-read the stream to get the latest version
// Re-apply your domain logic
// Retry the append
}
Warning
After an ErrOptimisticConcurrency error, you must roll back the transaction and start a new one. The failed transaction cannot be reused.
Scoped Consumers¶
If your consumer only cares about specific aggregate types, implement ScopedConsumer. The worker will only dispatch matching events:
import (
"context"
"database/sql"
"github.com/pupsourcing/store"
"github.com/pupsourcing/store/consumer"
)
// Verify interface compliance at compile time
var _ consumer.ScopedConsumer = (*OrderProjection)(nil)
type OrderProjection struct{}
func (p *OrderProjection) Name() string {
return "order_projection"
}
func (p *OrderProjection) AggregateTypes() []string {
return []string{"Order"}
}
func (p *OrderProjection) Handle(ctx context.Context, tx *sql.Tx, event store.PersistedEvent) error {
// Only receives events where AggregateType == "Order"
switch event.EventType {
case "OrderPlaced":
// ...
case "OrderShipped":
// ...
}
return nil
}
ScopedConsumer extends Consumer with an AggregateTypes() method. This is both a performance optimization (fewer events dispatched) and a way to make your consumer's scope explicit.
Next Steps¶
Now that you have a working event-sourced application, explore the rest of the documentation:
- Core Concepts — understand the principles behind event sourcing with pupsourcing
- Event Store — deep dive into store configuration, reading strategies, and advanced patterns
- Worker — learn about distributed processing, scaling, and dispatcher strategies
- Encryption — encrypt sensitive event payloads at rest