Transactional Outbox
Romancy provides built-in support for the Transactional Outbox Pattern, ensuring that events are published reliably and atomically with workflow execution.
What is the Transactional Outbox Pattern?
The Transactional Outbox Pattern solves the dual-write problem in distributed systems:
Problem:
// ❌ This can fail!
db.SaveOrder(order) // Step 1: Success
broker.PublishEvent(event) // Step 2: Fails! ← Event lostIf the broker is down or the network fails, the event is lost even though the order was saved.
Solution:
// ✅ Atomic operation
tx.Begin()
db.SaveOrder(order) // Step 1: Save order
db.SaveToOutboxTable(event) // Step 2: Save event
tx.Commit()
// Later: Background process publishes events from outboxBoth operations succeed or fail together. A background Outbox Relayer publishes events asynchronously.
Quick Start
1. Send Events Transactionally
Use SendEventTransactional() inside activities:
package main
import (
"context"
"fmt"
"github.com/i2y/romancy"
)
type CreateOrderResult struct {
OrderID string `json:"order_id"`
Status string `json:"status"`
}
var createOrder = romancy.DefineActivity("create_order",
func(ctx context.Context, orderID string, amount float64) (CreateOrderResult, error) {
// Business logic
fmt.Printf("Creating order %s\n", orderID)
// Get workflow context for transactional event sending
wfCtx := romancy.GetWorkflowContext(ctx)
// Send event transactionally
// This writes to the outbox table in the same transaction
err := romancy.SendEventTransactional(wfCtx,
"order.created",
"order-service",
map[string]any{
"order_id": orderID,
"amount": amount,
"status": "pending",
},
)
if err != nil {
return CreateOrderResult{}, err
}
return CreateOrderResult{OrderID: orderID, Status: "created"}, nil
},
)2. Enable Outbox Relayer
Configure the app with outbox enabled:
package main
import (
"context"
"log"
"github.com/i2y/romancy"
)
func main() {
app := romancy.NewApp(
romancy.WithDatabase("orders.db"),
romancy.WithWorkerID("worker-1"),
romancy.WithOutbox(true), // Enable outbox relayer
romancy.WithBrokerURL("http://localhost:8080/events"), // CloudEvents endpoint
)
ctx := context.Background()
if err := app.Start(ctx); err != nil {
log.Fatal(err)
}
defer app.Shutdown(ctx)
// Your workflow code here
}That’s it! Events are now published reliably.
Custom Database Operations (Advanced)
For true atomic transactions between your business logic and event publishing, you can access Romancy’s managed database session using ctx.Session().
Why Use ctx.Session()?
The basic approach (SendEventTransactional() inside activities) uses Romancy’s internal transaction to write to the outbox table. This is sufficient for most use cases where your business logic is:
- Read-only operations
- External API calls
- Romancy’s own workflow history
However, if you need to write to your own database and guarantee atomicity with event publishing, use ctx.Session():
Example: Payment Processing with Custom Database Operations
package main
import (
"context"
"fmt"
"github.com/i2y/romancy"
)
// Payment model (must be in the same database as Romancy)
type Payment struct {
OrderID string
Amount float64
Status string
}
type ProcessPaymentResult struct {
OrderID string `json:"order_id"`
Status string `json:"status"`
}
var processPayment = romancy.DefineActivity("process_payment",
func(ctx context.Context, orderID string, amount float64) (ProcessPaymentResult, error) {
// Get workflow context
wfCtx := romancy.GetWorkflowContext(ctx)
// Access Romancy-managed session (same database as Romancy)
session := wfCtx.Session()
// Your business logic (same transaction as Romancy)
payment := Payment{
OrderID: orderID,
Amount: amount,
Status: "completed",
}
// Insert payment record
_, err := session.Exec(
"INSERT INTO payments (order_id, amount, status) VALUES (?, ?, ?)",
payment.OrderID, payment.Amount, payment.Status,
)
if err != nil {
return ProcessPaymentResult{}, err
}
// Romancy event (same transaction)
err = romancy.SendEventTransactional(wfCtx,
"payment.processed",
"payment-service",
map[string]any{"order_id": orderID, "amount": amount},
)
if err != nil {
return ProcessPaymentResult{}, err
}
// Romancy automatically commits (or rolls back on exception):
// 1. Your payment record (same database)
// 2. Romancy's outbox event
// 3. Romancy's workflow history
return ProcessPaymentResult{OrderID: orderID, Status: "processed"}, nil
},
)Automatic Transaction Management
Activities automatically manage transactions:
type PaymentResult struct {
OrderID string `json:"order_id"`
}
var processPayment = romancy.DefineActivity("process_payment",
func(ctx context.Context, orderID string, amount float64) (PaymentResult, error) {
wfCtx := romancy.GetWorkflowContext(ctx)
// Access Romancy-managed session
session := wfCtx.Session()
// Your business logic
_, err := session.Exec("INSERT INTO payments ...")
if err != nil {
return PaymentResult{}, err // Romancy automatically rolls back
}
// Events in same transaction
err = romancy.SendEventTransactional(wfCtx, ...)
if err != nil {
return PaymentResult{}, err // Romancy automatically rolls back
}
// Romancy automatically commits on success
return PaymentResult{OrderID: orderID}, nil
},
)How it works:
When you access ctx.Session() inside an activity, Romancy automatically:
- Provides managed session: Returns the current transaction’s session
- Single transaction: All operations use the same session
- Commits or rolls back: Commits on success, rolls back on error
This results in:
- Single Transaction: Your payment data + Romancy’s outbox + history commit together
- Atomic Guarantee: Either all succeed or all fail (no data inconsistency)
- Rollback Safety: If any operation fails, everything rolls back
Key Points
- ✅ Atomic Guarantee: Your data + outbox event + history commit together
- ✅ Single Transaction: All operations use Romancy’s managed session
- ✅ Automatic Management: Romancy handles begin/commit/rollback
- ✅ Rollback Safety: If any operation fails, everything rolls back
- ⚠️ Database Requirement: See Database Requirements below
When to Use ctx.Session()
| Use Case | Recommended Approach |
|---|---|
| Writing to your own database + events | ctx.Session() - Ensures atomicity |
| Read-only operations | Standard activity - No custom DB access needed |
| External API calls | Standard activity - No database transaction needed |
Advanced Transaction Control
Activities are automatically transactional (see Automatic Transaction Management above). However, in advanced scenarios, you may need finer control over transaction boundaries using ctx.Transaction().
When to Use Advanced Transaction Control
- Nested transactions (savepoints) - Partial rollback within activities
- Workflow-level transactions - Manual control in workflow functions (not activities)
Nested Transactions (Savepoints) in Activities
Within an activity, you can use ctx.Transaction() for savepoints:
type OrderWithPaymentResult struct {
OrderID string `json:"order_id"`
Status string `json:"status"`
}
var processOrderWithOptionalPayment = romancy.DefineActivity("process_order_with_optional_payment",
func(ctx context.Context, orderID string) (OrderWithPaymentResult, error) {
wfCtx := romancy.GetWorkflowContext(ctx)
session := wfCtx.Session()
// Outer transaction (automatic, managed by activity)
_, err := session.Exec(
"INSERT INTO orders (order_id, status) VALUES (?, ?)",
orderID, "pending",
)
if err != nil {
return OrderWithPaymentResult{}, err
}
err = romancy.SendEventTransactional(wfCtx,
"order.created",
"order-service",
map[string]any{"order_id": orderID},
)
if err != nil {
return OrderWithPaymentResult{}, err
}
// Nested transaction (savepoint)
err = wfCtx.Transaction(func() error {
// Risky payment operation
err := romancy.SendEventTransactional(wfCtx,
"payment.attempted",
"payment-service",
map[string]any{"order_id": orderID},
)
if err != nil {
return err
}
_, err = session.Exec(
"INSERT INTO payments (order_id, amount) VALUES (?, ?)",
orderID, 100.0,
)
if err != nil {
return err
}
// Simulate failure
return fmt.Errorf("payment gateway unavailable")
})
if err != nil {
// Inner transaction rolled back to savepoint
// Order creation still succeeds, but payment failed
err = romancy.SendEventTransactional(wfCtx,
"payment.failed",
"payment-service",
map[string]any{"order_id": orderID, "reason": "gateway_unavailable"},
)
if err != nil {
return OrderWithPaymentResult{}, err
}
}
// Outer transaction commits successfully
return OrderWithPaymentResult{OrderID: orderID, Status: "created"}, nil
},
)Key points:
- Outer transaction is automatically managed by the activity
- Inner
ctx.Transaction()creates a savepoint - Inner transaction failure doesn’t affect outer transaction
- Order and payment failure event are both committed
How It Works
The Transactional Outbox Pattern has two phases:
Phase 1: Write to Outbox (Synchronous)
When you call SendEventTransactional():
sequenceDiagram
participant W as Workflow
participant A as Activity
participant DB as Database
W->>A: Execute activity
A->>DB: BEGIN TRANSACTION
A->>DB: Execute business logic
A->>DB: INSERT INTO outbox_events
A->>DB: INSERT INTO workflow_history
A->>DB: COMMIT TRANSACTION
A->>W: Return result
Key Points:
- Event is written to
outbox_eventstable - Same transaction as business logic
- If transaction fails, event is not stored
- If transaction succeeds, event will eventually be published
Phase 2: Publish Events (Asynchronous)
The Outbox Relayer runs in the background:
sequenceDiagram
participant R as Outbox Relayer
participant DB as Database
participant B as Message Broker
loop Every 1 second (configurable)
R->>DB: SELECT * FROM outbox_events WHERE status='pending'
DB->>R: Return pending events
loop For each event
R->>B: POST CloudEvent
alt Success
B->>R: 200 OK
R->>DB: UPDATE status='published'
else Failure
B->>R: 500 Error
R->>DB: UPDATE retry_count++
end
end
end
Key Points:
- Polls database every 1 second (default)
- Publishes events as CloudEvents (HTTP)
- Implements fixed-interval retry (retry on every poll until max_retries)
- At-least-once delivery guarantee
Outbox Relayer Configuration
Basic Configuration
app := romancy.NewApp(
romancy.WithDatabase("orders.db"),
romancy.WithWorkerID("worker-1"),
romancy.WithOutbox(true),
romancy.WithBrokerURL("http://localhost:8080/events"),
)Advanced Configuration
// Create custom relayer
relayer := romancy.NewOutboxRelayer(
romancy.WithRelayerStorage(app.Storage()),
romancy.WithRelayerBrokerURL("http://localhost:8080/events"),
romancy.WithPollInterval(2*time.Second), // Poll every 2 seconds
romancy.WithMaxRetries(5), // Retry up to 5 times
romancy.WithBatchSize(20), // Process 20 events per batch
romancy.WithMaxAgeHours(24.0), // Expire events older than 24 hours
)
ctx := context.Background()
// Start manually
if err := relayer.Start(ctx); err != nil {
log.Fatal(err)
}
// Stop gracefully
relayer.Stop(ctx)Configuration Options:
| Parameter | Type | Default | Description |
|---|---|---|---|
WithRelayerStorage | StorageProtocol | Required | Storage backend for outbox events |
WithRelayerBrokerURL | string | Required | Message Broker URL for publishing CloudEvents |
WithPollInterval | time.Duration | 1s | Polling interval |
WithMaxRetries | int | 3 | Maximum retry attempts for temporary failures |
WithBatchSize | int | 10 | Number of events to process per batch |
WithMaxAgeHours | float64 | 0 (disabled) | Maximum event age in hours before expiration |
Max Age Timeout:
Events older than max_age_hours are automatically marked as 'expired' and will not be retried. This is useful for time-sensitive events that become meaningless after a certain period:
// Example: Expire events older than 1 hour
relayer := romancy.NewOutboxRelayer(
romancy.WithRelayerStorage(app.Storage()),
romancy.WithRelayerBrokerURL("http://localhost:8080/events"),
romancy.WithMaxAgeHours(1.0), // Events older than 1 hour → 'expired'
)Use Cases for Max Age Timeout:
- Real-time notifications: Push notifications that are only relevant for a short time
- Time-sensitive updates: Stock price updates, sensor readings
- Temporary offers: Flash sale notifications that expire quickly
- Session-based events: Events tied to user sessions that timeout
Retry Logic and Error Handling
The Outbox Relayer implements intelligent retry with error classification:
How it works:
- Polling: Every
poll_interval(default: 1s), the relayer checks for pending events - Age Check (Optional): If
max_age_hoursis set, events older than threshold →'expired'status - Retry Check: Before each send attempt, checks if
retry_count >= max_retries - Send Attempt: If retry limit not exceeded, attempts to publish the event
- Success: Event status →
'published' - Failure: Error classification determines next action:
- 4xx HTTP Error (400, 401, 403, 404, 422, etc.) →
'invalid'status (permanent failure, don’t retry) - 5xx HTTP Error (500, 502, 503, 504, etc.) →
'pending'status,retry_countincremented (retry) - Network Error (connection timeout, DNS failure) →
'pending'status,retry_countincremented (retry) - Unknown Error →
'pending'status,retry_countincremented (retry, safety net)
- 4xx HTTP Error (400, 401, 403, 404, 422, etc.) →
- Max Retries Exceeded: Event status →
'failed'(permanently failed)
Example with default settings (poll_interval=1s, max_retries=3):
| Time | retry_count | Error Type | Action |
|---|---|---|---|
| T+0s | 0 | First send attempt | |
| T+0s | 0 | → 400 Bad Request | Stop: Marked as 'invalid' (client error) |
| T+0s | 0 | → 503 Service Unavailable | Continue: retry_count = 1 |
| T+1s | 1 | → Network timeout | Continue: retry_count = 2 |
| T+2s | 2 | → 500 Internal Server Error | Continue: retry_count = 3 |
| T+3s | 3 | N/A | Stop: retry_count >= max_retries, marked as 'failed' |
Important Notes:
- Fixed Interval: Events are retried on every poll (no exponential backoff)
- Retry Limit: With
max_retries=3, events are sent 3 times total (at retry_count=0, 1, 2) - 4xx Errors Don’t Retry: Client errors (malformed event, auth failure) are immediately marked as
'invalid' - 5xx Errors Retry: Server errors (broker down, temporary failure) are retried up to max_retries
- Network Errors Retry: Connection timeouts, DNS failures are treated as temporary errors
- Status Remains ‘pending’: During retries, event status stays
'pending'(changes to'published'on success,'failed'after max retries, or'invalid'for 4xx errors) - Max Age Timeout: Events older than
max_age_hoursare marked as'expired'and not retried
Event Status Lifecycle
Events in the outbox_events table have five statuses:
pending → published (Success)
→ failed (After max_retries exceeded)
→ invalid (4xx HTTP error, permanent failure)
→ expired (Max age exceeded, too old to retry)Status Descriptions:
| Status | Meaning | Retryable | How it happens |
|---|---|---|---|
pending | Event waiting to be published | ✅ Yes | Initial status when event is created |
published | Event successfully published | ❌ No | 2xx HTTP response from broker |
failed | Event permanently failed after max retries | ❌ No | retry_count >= max_retries |
invalid | Event is malformed or unauthorized | ❌ No | 4xx HTTP error (400, 401, 403, 404, 422) |
expired | Event too old to be meaningful | ❌ No | created_at older than max_age_hours |
Database Schema:
-- SQLite / PostgreSQL / MySQL compatible schema
CREATE TABLE outbox_events (
event_id VARCHAR(255) PRIMARY KEY,
event_type VARCHAR(255) NOT NULL,
event_source VARCHAR(255) NOT NULL,
event_data TEXT NOT NULL, -- JSON (unlimited size)
content_type VARCHAR(100) NOT NULL DEFAULT 'application/json',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
published_at TIMESTAMP,
status VARCHAR(50) NOT NULL DEFAULT 'pending',
retry_count INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
-- Status constraint (enforced by database)
CONSTRAINT valid_outbox_status CHECK (
status IN ('pending', 'published', 'failed', 'invalid', 'expired')
)
);
-- Index for efficient polling by Outbox Relayer
CREATE INDEX idx_outbox_status_created ON outbox_events(status, created_at);Query Outbox Events:
ctx := context.Background()
// Get pending events
events, err := app.Storage().GetPendingOutboxEvents(ctx, 10)
if err != nil {
log.Fatal(err)
}
for _, event := range events {
fmt.Printf("Event: %s, Status: %s\n", event.EventType, event.Status)
}Use Cases
1. Order Processing with Events
type InventoryResult struct {
TotalAmount float64 `json:"total_amount"`
}
type ChargeResult struct {
TransactionID string `json:"transaction_id"`
}
type ShipmentResult struct {
TrackingNumber string `json:"tracking_number"`
}
type OrderProcessingResult struct {
OrderID string `json:"order_id"`
Status string `json:"status"`
}
var orderProcessingWorkflow = romancy.DefineWorkflow("process_order",
func(ctx *romancy.WorkflowContext, orderID string, items []OrderItem) (OrderProcessingResult, error) {
// Step 1: Reserve inventory
// → Sends "inventory.reserved" event
inventory, err := reserveInventory.Execute(ctx, orderID, items)
if err != nil {
return OrderProcessingResult{}, err
}
// Step 2: Charge payment
// → Sends "payment.charged" event
_, err = chargePayment.Execute(ctx, orderID, inventory.TotalAmount)
if err != nil {
return OrderProcessingResult{}, err
}
// Step 3: Ship order
// → Sends "order.shipped" event
_, err = shipOrder.Execute(ctx, orderID)
if err != nil {
return OrderProcessingResult{}, err
}
// Final event
err = romancy.SendEventTransactional(ctx,
"order.completed",
"order-service",
map[string]any{"order_id": orderID, "status": "completed"},
)
if err != nil {
return OrderProcessingResult{}, err
}
return OrderProcessingResult{OrderID: orderID, Status: "completed"}, nil
},
)2. Compensation with Events
Send events during compensation (rollback):
var releaseInventory = romancy.DefineCompensation("release_inventory",
func(ctx context.Context, reservationID string) error {
fmt.Printf("Releasing inventory: %s\n", reservationID)
wfCtx := romancy.GetWorkflowContext(ctx)
// Send compensation event
return romancy.SendEventTransactional(wfCtx,
"inventory.released",
"order-service",
map[string]any{
"reservation_id": reservationID,
"reason": "order_failed",
},
)
},
)3. Saga Pattern Integration
var cancelReservation = romancy.DefineCompensation("cancel_hotel_reservation",
func(ctx context.Context, reservationID string) error {
wfCtx := romancy.GetWorkflowContext(ctx)
// Send cancellation event
return romancy.SendEventTransactional(wfCtx,
"hotel.cancelled",
"booking-service",
map[string]any{"reservation_id": reservationID},
)
},
)
type ReserveHotelResult struct {
ReservationID string `json:"reservation_id"`
}
var reserveHotel = romancy.DefineActivity("reserve_hotel",
func(ctx context.Context, bookingID string) (ReserveHotelResult, error) {
wfCtx := romancy.GetWorkflowContext(ctx)
// Reserve hotel
reservationID := fmt.Sprintf("HOTEL-%s", bookingID)
// Send event transactionally
err := romancy.SendEventTransactional(wfCtx,
"hotel.reserved",
"booking-service",
map[string]any{"reservation_id": reservationID},
)
if err != nil {
return ReserveHotelResult{}, err
}
return ReserveHotelResult{ReservationID: reservationID}, nil
},
romancy.WithCompensation(cancelReservation),
)Best Practices
1. Activity Transaction Control
Activities are automatically transactional. All operations within an activity (including SendEventTransactional()) are executed within a single transaction.
type OrderActivityResult struct {
OrderID string `json:"order_id"`
}
// ✅ Default: Automatic transaction (recommended)
var createOrder = romancy.DefineActivity("create_order",
func(ctx context.Context, orderID string) (OrderActivityResult, error) {
wfCtx := romancy.GetWorkflowContext(ctx)
// All operations are automatically transactional:
// - Workflow history recording
// - Event publishing to outbox
err := romancy.SendEventTransactional(wfCtx, "order.created", "order-service", ...)
if err != nil {
return OrderActivityResult{}, err
}
return OrderActivityResult{OrderID: orderID}, nil
},
)
// ✅ With session access for custom database operations
var createOrderWithDB = romancy.DefineActivity("create_order_with_db",
func(ctx context.Context, orderID string) (OrderActivityResult, error) {
wfCtx := romancy.GetWorkflowContext(ctx)
// Access Romancy-managed session
session := wfCtx.Session()
// Your database operations
_, err := session.Exec("INSERT INTO orders (order_id) VALUES (?)", orderID)
if err != nil {
return OrderActivityResult{}, err
}
// Events in same transaction
err = romancy.SendEventTransactional(wfCtx, "order.created", "order-service", ...)
if err != nil {
return OrderActivityResult{}, err
}
return OrderActivityResult{OrderID: orderID}, nil
},
)Default behavior:
- ✅ Automatic transaction management
- ✅ Events are atomic with workflow history
- ✅ No manual begin/commit/rollback needed
2. Use CloudEvents Standard
Romancy publishes events as CloudEvents (HTTP), ensuring compatibility with:
- ✅ Knative Eventing
- ✅ CloudEvents SDK
- ✅ Any CloudEvents-compatible broker
3. Monitor Outbox Table
Monitor your outbox table regularly to detect issues:
ctx := context.Background()
// Get event status summary using raw SQL
db := app.Storage().DB()
rows, err := db.QueryContext(ctx, `
SELECT status, COUNT(*) as count
FROM outbox_events
GROUP BY status
`)
if err != nil {
log.Fatal(err)
}
defer rows.Close()
statusCounts := make(map[string]int)
for rows.Next() {
var status string
var count int
rows.Scan(&status, &count)
statusCounts[status] = count
}
fmt.Println("📊 Outbox Status Summary:")
fmt.Printf(" - Pending: %d\n", statusCounts["pending"])
fmt.Printf(" - Published: %d\n", statusCounts["published"])
fmt.Printf(" - Failed: %d\n", statusCounts["failed"])
fmt.Printf(" - Invalid: %d\n", statusCounts["invalid"])
fmt.Printf(" - Expired: %d\n", statusCounts["expired"])
// Alert on invalid events
if statusCounts["invalid"] > 0 {
fmt.Printf("⚠️ %d events marked as invalid (4xx errors)\n", statusCounts["invalid"])
fmt.Println(" → Check event format, broker authentication, or permissions")
}
// Alert on expired events
if statusCounts["expired"] > 0 {
fmt.Printf("⚠️ %d events expired due to age\n", statusCounts["expired"])
fmt.Println(" → Check broker availability or consider increasing max_age_hours")
}
// Alert on permanently failed events
if statusCounts["failed"] > 0 {
fmt.Printf("⚠️ %d events failed permanently\n", statusCounts["failed"])
fmt.Println(" → Check broker health or network connectivity")
}4. Handle Idempotency
Since the outbox pattern provides at-least-once delivery, consumers must handle duplicate events:
// Consumer side (idempotency key)
func HandleEvent(w http.ResponseWriter, r *http.Request) {
var event cloudevents.Event
if err := json.NewDecoder(r.Body).Decode(&event); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
eventID := event.ID()
// Check if already processed
if isEventProcessed(eventID) {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{"status": "duplicate"})
return
}
// Process event
if err := processOrderCreated(event.Data()); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Mark as processed
markEventProcessed(eventID)
w.WriteHeader(http.StatusOK)
}Complete Example: E-commerce Order Processing
This example demonstrates a real-world e-commerce order processing system using custom database operations with ctx.Session().
Scenario
- Create order in your database tables (same database as Romancy)
- Send “order.created” event to Romancy’s outbox
- Both operations are atomic (same transaction)
Code
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/i2y/romancy"
)
// Order model (same database as Romancy)
type Order struct {
OrderID string
CustomerEmail string
Amount float64
Status string
CreatedAt time.Time
}
type EcommerceOrderResult struct {
OrderID string `json:"order_id"`
Status string `json:"status"`
}
// Activity with custom database operations
var createOrder = romancy.DefineActivity("create_order",
func(ctx context.Context, orderID, customerEmail string, amount float64) (EcommerceOrderResult, error) {
wfCtx := romancy.GetWorkflowContext(ctx)
// Access Romancy-managed session (same database)
session := wfCtx.Session()
// 1. Save order to YOUR table (same database as Romancy)
_, err := session.Exec(`
INSERT INTO orders (order_id, customer_email, amount, status, created_at)
VALUES (?, ?, ?, ?, ?)
`, orderID, customerEmail, amount, "pending", time.Now())
if err != nil {
return EcommerceOrderResult{}, err
}
// 2. Publish event to Romancy's outbox (SAME transaction)
err = romancy.SendEventTransactional(wfCtx,
"order.created",
"order-service",
map[string]any{
"order_id": orderID,
"customer_email": customerEmail,
"amount": amount,
"status": "pending",
},
)
if err != nil {
return EcommerceOrderResult{}, err
}
// 3. Romancy automatically commits (or rolls back on error)
// If event publishing fails, order creation rolls back
// If order creation fails, event publishing rolls back
return EcommerceOrderResult{OrderID: orderID, Status: "created"}, nil
},
)
// Workflow
var orderWorkflow = romancy.DefineWorkflow("order_workflow",
func(ctx *romancy.WorkflowContext, orderID, customerEmail string, amount float64) (EcommerceOrderResult, error) {
// Create order (atomic with event)
result, err := createOrder.Execute(ctx, orderID, customerEmail, amount)
if err != nil {
return EcommerceOrderResult{}, err
}
return result, nil
},
)
func main() {
// Romancy app (all tables in same database)
app := romancy.NewApp(
romancy.WithDatabase("postgresql://user:password@localhost/ecommerce"),
romancy.WithWorkerID("worker-1"),
romancy.WithOutbox(true),
romancy.WithBrokerURL("http://localhost:8080/events"),
)
ctx := context.Background()
if err := app.Start(ctx); err != nil {
log.Fatal(err)
}
defer app.Shutdown(ctx)
// Start workflow
instanceID, err := romancy.StartWorkflow(ctx, app, orderWorkflow, OrderInput{
OrderID: "ORD-12345",
Email: "customer@example.com",
Amount: 149.99,
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Order created: %s\n", instanceID)
// Event will be published by Outbox Relayer
time.Sleep(2 * time.Second)
}Database Requirements for Custom Database Operations
When using ctx.Session() to access Romancy’s managed session for custom database operations, your application tables must be in the same database as Romancy’s tables:
PostgreSQL:
- Requirement: Your tables and Romancy’s tables must be in the same PostgreSQL database (not just the same server/instance)
- Reason: PostgreSQL connections are bound to a single database
- Example: If Romancy uses
postgresql://user:pass@localhost/myapp, your tables must also be in themyappdatabase
MySQL:
- Requirement: Your tables and Romancy’s tables must be in the same MySQL database
- Reason: While MySQL technically allows cross-database transactions, we recommend using the same database for simplicity
- Example: If Romancy uses
mysql://user:pass@localhost/myapp, your tables should also be in themyappdatabase
SQLite:
- Requirement: Your tables and Romancy’s tables must be in the same SQLite file
- Reason: SQLite is file-based; cross-file transactions are not possible
- Example: If Romancy uses
workflow.db, your tables must also be inworkflow.db
Best Practice: Use the same database for both Romancy and your application tables. This simplifies deployment and ensures transactional consistency.
Key Points
- Database Requirement: See Database Requirements above for PostgreSQL/MySQL/SQLite specifics
- Session Access:
ctx.Session()provides Romancy’s managed session - Automatic Management: Romancy handles begin/commit/rollback automatically
- Atomic Guarantee: Order creation + event publishing commit together
- Rollback Safety: If any step fails, everything rolls back
What Happens
1. Order saved to YOUR database (orders table)
2. Event saved to Romancy's outbox (outbox_events table)
3. Both committed atomically (COMMIT)
4. Outbox Relayer publishes event (async, background)
5. Event delivered to broker (at-least-once guarantee)What You Learned
- ✅ Transactional Outbox Pattern: Solves the dual-write problem
- ✅ SendEventTransactional(): Writes events to outbox table
- ✅ ctx.Session(): Access Romancy’s managed session for custom database operations
- ✅ Automatic Transaction Management: Romancy handles begin/commit/rollback
- ✅ Rollback Safety: All operations roll back on failure
- ✅ Outbox Relayer: Publishes events asynchronously with retry
- ✅ At-Least-Once Delivery: Guaranteed event delivery
- ✅ CloudEvents Support: Standard event format
See Also
- Saga Pattern: Combine with compensation
- CloudEvents HTTP Binding: CloudEvents specification compliance
- Lifecycle Hooks: Monitor outbox events with lifecycle hooks