Transactional Outbox Pattern¶
Edda provides built-in support for the Transactional Outbox Pattern, ensuring that events are published reliably and atomically with workflow execution.
What is the Transactional Outbox Pattern?¶
The Transactional Outbox Pattern solves the dual-write problem in distributed systems:
Problem:
# ❌ This can fail!
await db.save_order(order) # Step 1: Success
await broker.publish_event(...) # Step 2: Fails! ← Event lost
If the broker is down or the network fails, the event is lost even though the order was saved.
Solution:
# ✅ Atomic operation
async with transaction:
await db.save_order(order) # Step 1: Save order
await db.save_to_outbox_table(event) # Step 2: Save event
# Later: Background process publishes events from outbox
Both operations succeed or fail together. A background Outbox Relayer publishes events asynchronously.
Quick Start¶
1. Send Events Transactionally¶
Use send_event_transactional() inside activities:
from edda import activity, WorkflowContext, send_event_transactional
@activity
async def create_order(ctx: WorkflowContext, order_id: str, amount: float):
"""Create an order and send event transactionally."""
# Business logic
print(f"Creating order {order_id}")
# Send event transactionally
# This writes to the outbox table in the same transaction
await send_event_transactional(
ctx,
event_type="order.created",
event_source="order-service",
event_data={
"order_id": order_id,
"amount": amount,
"status": "pending"
}
)
return {"order_id": order_id, "status": "created"}
2. Enable Outbox Relayer¶
Configure EddaApp with outbox_enabled=True:
from edda import EddaApp
app = EddaApp(
service_name="order-service",
db_url="sqlite:///orders.db",
outbox_enabled=True, # Enable outbox relayer
broker_url="http://localhost:8080/events" # CloudEvents endpoint
)
await app.initialize()
That's it! Events are now published reliably.
Custom Database Operations (Advanced)¶
For true atomic transactions between your business logic and event publishing, you can access Edda's managed database session using ctx.session.
Why Use ctx.session?¶
The basic approach (send_event_transactional() inside activities) uses Edda's internal transaction to write to the outbox table. This is sufficient for most use cases where your business logic is:
- Read-only operations
- External API calls
- Edda's own workflow history
However, if you need to write to your own database and guarantee atomicity with event publishing, use ctx.session:
Example: Payment Processing with Custom Database Operations¶
from sqlalchemy import Column, String, Float
from sqlalchemy.orm import declarative_base
from edda import WorkflowContext, activity
from edda.outbox.transactional import send_event_transactional
# Define your ORM models (must be in the same database as Edda)
Base = declarative_base()
class Payment(Base):
__tablename__ = "payments"
order_id = Column(String, primary_key=True)
amount = Column(Float, nullable=False)
status = Column(String, nullable=False)
@activity # Edda automatically manages the transaction
async def process_payment(ctx: WorkflowContext, order_id: str, amount: float):
"""Process payment with custom database operations."""
# Access Edda-managed session (same database as Edda)
session = ctx.session
# Your business logic (same transaction as Edda)
payment = Payment(order_id=order_id, amount=amount, status="completed")
session.add(payment)
# Edda event (same transaction)
await send_event_transactional(
ctx,
event_type="payment.processed",
event_source="payment-service",
event_data={"order_id": order_id, "amount": amount}
)
# Edda automatically commits (or rolls back on exception):
# 1. Your payment record (same database)
# 2. Edda's outbox event
# 3. Edda's workflow history
return {"order_id": order_id, "status": "processed"}
✅ Automatic Transaction Management¶
Activities automatically manage transactions:
@activity # Edda automatically manages the transaction
async def process_payment(ctx: WorkflowContext, order_id: str, amount: float):
# Access Edda-managed session
session = ctx.session
# Your business logic
payment = Payment(order_id=order_id, amount=amount)
session.add(payment)
# Events in same transaction
await send_event_transactional(ctx, ...)
# Edda automatically commits (or rolls back on exception)
return {"order_id": order_id}
How it works:
When you access ctx.session inside an activity, Edda automatically:
- Provides managed session: Returns the current transaction's session
- Single transaction: All operations use the same session
- Commits or rolls back: Commits on success, rolls back on exception
This results in:
- Single Transaction: Your payment data + Edda's outbox + history commit together
- Atomic Guarantee: Either all succeed or all fail (no data inconsistency)
- Rollback Safety: If any operation fails, everything rolls back
Key Points¶
- ✅ Atomic Guarantee: Your data + outbox event + history commit together
- ✅ Single Transaction: All operations use Edda's managed session
- ✅ Automatic Management: Edda handles begin/commit/rollback
- ✅ Rollback Safety: If any operation fails, everything rolls back
- ⚠️ Database Requirement: See Database Requirements below
When to Use ctx.session¶
| Use Case | Recommended Approach |
|---|---|
| Writing to your own database + events | ctx.session - Ensures atomicity |
| Read-only operations | Standard @activity - No custom DB access needed |
| External API calls | Standard @activity - No database transaction needed |
Advanced Transaction Control¶
Activities are automatically transactional (see Automatic Transaction Management above). However, in advanced scenarios, you may need finer control over transaction boundaries using ctx.transaction().
When to Use Advanced Transaction Control¶
- Nested transactions (savepoints) - Partial rollback within activities
- Workflow-level transactions - Manual control in workflow functions (not activities)
Nested Transactions (Savepoints) in Activities¶
Within an activity, you can use ctx.transaction() for savepoints:
from edda import WorkflowContext, activity
from edda.outbox.transactional import send_event_transactional
@activity
async def process_order_with_optional_payment(ctx: WorkflowContext, order_id: str):
# Outer transaction (automatic, managed by @activity)
session = ctx.session
order = Order(order_id=order_id, status="pending")
session.add(order)
await send_event_transactional(
ctx,
event_type="order.created",
event_source="order-service",
event_data={"order_id": order_id}
)
try:
async with ctx.transaction(): # Nested transaction (savepoint)
# Risky payment operation
await send_event_transactional(
ctx,
event_type="payment.attempted",
event_source="payment-service",
event_data={"order_id": order_id}
)
payment = Payment(order_id=order_id, amount=100.0)
session.add(payment)
# Simulate failure
raise Exception("Payment gateway unavailable")
except Exception:
# Inner transaction rolled back to savepoint
# Order creation still succeeds, but payment failed
await send_event_transactional(
ctx,
event_type="payment.failed",
event_source="payment-service",
event_data={"order_id": order_id, "reason": "gateway_unavailable"}
)
# Outer transaction commits successfully
return {"order_id": order_id, "status": "created"}
Key points:
- Outer transaction is automatically managed by
@activity - Inner
ctx.transaction()creates a savepoint - Inner transaction failure doesn't affect outer transaction
- Order and payment failure event are both committed
Transaction Control in Workflows¶
In workflow functions (not activities), you can use ctx.transaction() directly:
from edda import workflow, WorkflowContext
from edda.outbox.transactional import send_event_transactional
@workflow
async def custom_workflow(ctx: WorkflowContext, data: dict):
# Manual transaction control (workflow functions don't auto-manage transactions)
async with ctx.transaction():
await ctx.storage.append_history(
instance_id=ctx.instance_id,
activity_id="custom_step:1",
event_type="CustomEvent",
event_data={"data": data}
)
await send_event_transactional(
ctx,
event_type="workflow.started",
event_source="my-service",
event_data={"data": data}
)
# Activities manage their own transactions
result = await my_activity(ctx, data)
return result
Note: This is rarely needed - prefer using activities for transactional operations.
How It Works¶
The Transactional Outbox Pattern has two phases:
Phase 1: Write to Outbox (Synchronous)¶
When you call send_event_transactional():
sequenceDiagram
participant W as Workflow
participant A as Activity
participant DB as Database
W->>A: Execute activity
A->>DB: BEGIN TRANSACTION
A->>DB: Execute business logic
A->>DB: INSERT INTO outbox_events
A->>DB: INSERT INTO workflow_history
A->>DB: COMMIT TRANSACTION
A->>W: Return result
Key Points:
- Event is written to
outbox_eventstable - Same transaction as business logic
- If transaction fails, event is not stored
- If transaction succeeds, event will eventually be published
Phase 2: Publish Events (Asynchronous)¶
The Outbox Relayer runs in the background:
sequenceDiagram
participant R as Outbox Relayer
participant DB as Database
participant B as Message Broker
loop Every 1 second (configurable)
R->>DB: SELECT * FROM outbox_events WHERE status='pending'
DB->>R: Return pending events
loop For each event
R->>B: POST CloudEvent
alt Success
B->>R: 200 OK
R->>DB: UPDATE status='published'
else Failure
B->>R: 500 Error
R->>DB: UPDATE retry_count++
end
end
end
Key Points:
- Polls database every 1 second (default)
- Publishes events as CloudEvents (HTTP)
- Implements fixed-interval retry (retry on every poll until max_retries)
- At-least-once delivery guarantee
Outbox Relayer Configuration¶
Basic Configuration¶
app = EddaApp(
service_name="order-service",
db_url="sqlite:///orders.db",
outbox_enabled=True,
broker_url="http://localhost:8080/events",
)
Advanced Configuration¶
from edda import EddaApp, OutboxRelayer
# Create custom relayer
relayer = OutboxRelayer(
storage=app.storage,
broker_url="http://localhost:8080/events",
poll_interval=2.0, # Poll every 2 seconds
max_retries=5, # Retry up to 5 times
batch_size=20, # Process 20 events per batch
max_age_hours=24.0, # Expire events older than 24 hours (optional)
)
# Start manually
await relayer.start()
# Stop gracefully
await relayer.stop()
Configuration Options:
| Parameter | Type | Default | Description |
|---|---|---|---|
storage |
StorageProtocol |
Required | Storage backend for outbox events |
broker_url |
str |
Required | Message Broker URL for publishing CloudEvents |
poll_interval |
float |
1.0 |
Polling interval in seconds |
max_retries |
int |
3 |
Maximum retry attempts for temporary failures |
batch_size |
int |
10 |
Number of events to process per batch |
max_age_hours |
float \| None |
None |
Maximum event age in hours before expiration (disabled by default) |
Max Age Timeout:
Events older than max_age_hours are automatically marked as 'expired' and will not be retried. This is useful for time-sensitive events that become meaningless after a certain period:
# Example: Expire events older than 1 hour
relayer = OutboxRelayer(
storage=app.storage,
broker_url="http://localhost:8080/events",
max_age_hours=1.0, # Events older than 1 hour → 'expired'
)
Use Cases for Max Age Timeout:
- Real-time notifications: Push notifications that are only relevant for a short time
- Time-sensitive updates: Stock price updates, sensor readings
- Temporary offers: Flash sale notifications that expire quickly
- Session-based events: Events tied to user sessions that timeout
Retry Logic and Error Handling¶
The Outbox Relayer implements intelligent retry with error classification:
How it works:
- Polling: Every
poll_intervalseconds (default: 1s), the relayer checks for pending events - Age Check (Optional): If
max_age_hoursis set, events older than threshold →'expired'status - Retry Check: Before each send attempt, checks if
retry_count >= max_retries - Send Attempt: If retry limit not exceeded, attempts to publish the event
- Success: Event status →
'published' - Failure: Error classification determines next action:
- 4xx HTTP Error (400, 401, 403, 404, 422, etc.) →
'invalid'status (permanent failure, don't retry) - 5xx HTTP Error (500, 502, 503, 504, etc.) →
'pending'status,retry_countincremented (retry) - Network Error (connection timeout, DNS failure) →
'pending'status,retry_countincremented (retry) - Unknown Error →
'pending'status,retry_countincremented (retry, safety net) - Max Retries Exceeded: Event status →
'failed'(permanently failed)
Example with default settings (poll_interval=1s, max_retries=3):
| Time | retry_count | Error Type | Action |
|---|---|---|---|
| T+0s | 0 | First send attempt | |
| T+0s | 0 | → 400 Bad Request | Stop: Marked as 'invalid' (client error) |
| T+0s | 0 | → 503 Service Unavailable | Continue: retry_count = 1 |
| T+1s | 1 | → Network timeout | Continue: retry_count = 2 |
| T+2s | 2 | → 500 Internal Server Error | Continue: retry_count = 3 |
| T+3s | 3 | N/A | Stop: retry_count >= max_retries, marked as 'failed' |
Important Notes:
- Fixed Interval: Events are retried on every poll (no exponential backoff)
- Retry Limit: With
max_retries=3, events are sent 3 times total (at retry_count=0, 1, 2) - 4xx Errors Don't Retry: Client errors (malformed event, auth failure) are immediately marked as
'invalid' - 5xx Errors Retry: Server errors (broker down, temporary failure) are retried up to max_retries
- Network Errors Retry: Connection timeouts, DNS failures are treated as temporary errors
- Status Remains 'pending': During retries, event status stays
'pending'(changes to'published'on success,'failed'after max retries, or'invalid'for 4xx errors) - Max Age Timeout: Events older than
max_age_hoursare marked as'expired'and not retried
With Pydantic Models¶
send_event_transactional() supports Pydantic models:
from pydantic import BaseModel
from edda import activity, send_event_transactional
class OrderCreated(BaseModel):
order_id: str
customer_id: str
amount: float
items: list[str]
@activity
async def create_order(ctx: WorkflowContext, order_data: dict):
# Create Pydantic model
event = OrderCreated(
order_id="ORD-123",
customer_id="CUST-456",
amount=99.99,
items=["item1", "item2"]
)
# Send Pydantic model (auto-converted to JSON)
await send_event_transactional(
ctx,
event_type="order.created",
event_source="order-service",
event_data=event # ✅ Pydantic model supported
)
return {"status": "created"}
Event Status Lifecycle¶
Events in the outbox_events table have five statuses:
pending → published (Success)
→ failed (After max_retries exceeded)
→ invalid (4xx HTTP error, permanent failure)
→ expired (Max age exceeded, too old to retry)
Status Descriptions:
| Status | Meaning | Retryable | How it happens |
|---|---|---|---|
pending |
Event waiting to be published | ✅ Yes | Initial status when event is created |
published |
Event successfully published | ❌ No | 2xx HTTP response from broker |
failed |
Event permanently failed after max retries | ❌ No | retry_count >= max_retries |
invalid |
Event is malformed or unauthorized | ❌ No | 4xx HTTP error (400, 401, 403, 404, 422) |
expired |
Event too old to be meaningful | ❌ No | created_at older than max_age_hours |
Database Schema:
-- SQLite / PostgreSQL / MySQL compatible schema
CREATE TABLE outbox_events (
event_id VARCHAR(255) PRIMARY KEY,
event_type VARCHAR(255) NOT NULL,
event_source VARCHAR(255) NOT NULL,
event_data TEXT NOT NULL, -- JSON (unlimited size)
content_type VARCHAR(100) NOT NULL DEFAULT 'application/json',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
published_at TIMESTAMP,
status VARCHAR(50) NOT NULL DEFAULT 'pending',
retry_count INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
-- Status constraint (enforced by database)
CONSTRAINT valid_outbox_status CHECK (
status IN ('pending', 'published', 'failed', 'invalid', 'expired')
)
);
-- Index for efficient polling by Outbox Relayer
CREATE INDEX idx_outbox_status_created ON outbox_events(status, created_at);
Notes: - Column types: Actual implementation uses precise types (VARCHAR with limits, TIMESTAMP with timezone support for PostgreSQL) - TIMESTAMP: PostgreSQL/MySQL use timezone-aware TIMESTAMP; SQLite uses TEXT with ISO 8601 format - Constraint: Database-level enforcement ensures status values are valid - Index: Optimizes queries for pending events (used by Outbox Relayer)
Query Outbox Events:
# Get pending events
events = await app.storage.get_pending_outbox_events(limit=10)
for event in events:
print(f"Event: {event['event_type']}, Status: {event['status']}")
Use Cases¶
1. Order Processing with Events¶
@workflow
async def process_order(ctx: WorkflowContext, order_id: str, items: list[dict]):
# Step 1: Reserve inventory
inventory = await reserve_inventory(ctx, order_id, items)
# → Sends "inventory.reserved" event
# Step 2: Charge payment
payment = await charge_payment(ctx, order_id, total_amount)
# → Sends "payment.charged" event
# Step 3: Ship order
shipment = await ship_order(ctx, order_id)
# → Sends "order.shipped" event
# Final event
await send_event_transactional(
ctx,
event_type="order.completed",
event_source="order-service",
event_data={"order_id": order_id, "status": "completed"}
)
return {"order_id": order_id, "status": "completed"}
2. Compensation with Events¶
Send events during compensation (rollback):
@compensation
async def release_inventory(ctx: WorkflowContext, reservation_id: str):
"""Release inventory and publish event."""
print(f"Releasing inventory: {reservation_id}")
# Send compensation event
await send_event_transactional(
ctx,
event_type="inventory.released",
event_source="order-service",
event_data={
"reservation_id": reservation_id,
"reason": "order_failed"
}
)
3. Saga Pattern Integration¶
@activity
@on_failure(cancel_reservation)
async def reserve_hotel(ctx: WorkflowContext, booking_id: str):
# Reserve hotel
reservation_id = f"HOTEL-{booking_id}"
# Send event transactionally
await send_event_transactional(
ctx,
event_type="hotel.reserved",
event_source="booking-service",
event_data={"reservation_id": reservation_id}
)
return {"reservation_id": reservation_id}
@compensation
async def cancel_reservation(ctx: WorkflowContext, reservation_id: str):
# Send cancellation event
await send_event_transactional(
ctx,
event_type="hotel.cancelled",
event_source="booking-service",
event_data={"reservation_id": reservation_id}
)
Best Practices¶
1. Activity Transaction Control¶
Activities are automatically transactional. All operations within an activity (including send_event_transactional()) are executed within a single transaction.
# ✅ Default: Automatic transaction (recommended)
@activity
async def create_order(ctx: WorkflowContext, order_id: str):
# All operations are automatically transactional:
# - Workflow history recording
# - Event publishing to outbox
await send_event_transactional(ctx, "order.created", ...)
# ✅ With external session sharing
@activity
async def create_order_with_db(ctx: WorkflowContext, order_id: str):
# Access Edda-managed session
session = ctx.session
# Your database operations
order = Order(order_id=order_id)
session.add(order)
# Events in same transaction
await send_event_transactional(ctx, "order.created", ...)
# ❌ Bad: Outside activity (no transaction)
@workflow
async def order_workflow(ctx: WorkflowContext):
await send_event_transactional(ctx, ...) # ⚠️ Warning logged
Default behavior:
- ✅ Automatic transaction management
- ✅ Events are atomic with workflow history
- ✅ No manual begin/commit/rollback needed
2. Understanding Transaction Warnings¶
When you call send_event_transactional() outside of a transaction, Edda logs a warning but still sends the event. However, atomicity is not guaranteed.
Warning Message:
WARNING: send_event_transactional() called outside of a transaction.
Event will still be sent, but atomicity with other operations is not guaranteed.
Consider using @activity (with default transactional=True) or wrapping in ctx.transaction().
How to fix:
# ❌ Problem: No transaction
@workflow
async def my_workflow(ctx: WorkflowContext):
await send_event_transactional(ctx, ...) # ⚠️ Warning!
# ✅ Solution: Use activity (automatic transaction)
@activity
async def my_activity(ctx: WorkflowContext):
await send_event_transactional(ctx, ...) # ✅ Atomic
Detecting if you're in a transaction:
3. Use CloudEvents Standard¶
Edda publishes events as CloudEvents (HTTP), ensuring compatibility with:
- ✅ Knative Eventing
- ✅ CloudEvents SDK
- ✅ Any CloudEvents-compatible broker
4. Monitor Outbox Table¶
Monitor your outbox table regularly to detect issues:
from sqlalchemy import text, select, func
from edda.storage.sqlalchemy_storage import OutboxEvent
# Get event status summary
async with AsyncSession(app.storage.engine) as session:
result = await session.execute(
select(
OutboxEvent.status,
func.count(OutboxEvent.event_id).label('count')
).group_by(OutboxEvent.status)
)
status_counts = {row.status: row.count for row in result}
print(f"📊 Outbox Status Summary:")
print(f" - Pending: {status_counts.get('pending', 0)}")
print(f" - Published: {status_counts.get('published', 0)}")
print(f" - Failed: {status_counts.get('failed', 0)}")
print(f" - Invalid: {status_counts.get('invalid', 0)}") # 4xx errors
print(f" - Expired: {status_counts.get('expired', 0)}") # Too old
# Alert on invalid events (indicates malformed events or configuration issues)
if status_counts.get('invalid', 0) > 0:
print(f"⚠️ {status_counts['invalid']} events marked as invalid (4xx errors)")
print(" → Check event format, broker authentication, or permissions")
# Alert on expired events (might indicate broker outage)
if status_counts.get('expired', 0) > 0:
print(f"⚠️ {status_counts['expired']} events expired due to age")
print(" → Check broker availability or consider increasing max_age_hours")
# Alert on permanently failed events (max retries exceeded)
if status_counts.get('failed', 0) > 0:
print(f"⚠️ {status_counts['failed']} events failed permanently")
print(" → Check broker health or network connectivity")
Monitoring Best Practices:
- Set up alerts for
invalidstatus (indicates configuration issues) - Monitor
expiredcount (indicates prolonged broker unavailability) - Track
failedevents (indicates persistent problems) - Review
last_errorfield to diagnose root causes:
# Get details of invalid events
async with AsyncSession(app.storage.engine) as session:
invalid_events = await session.execute(
select(OutboxEvent).where(OutboxEvent.status == 'invalid').limit(10)
)
for event in invalid_events.scalars():
print(f"Invalid Event: {event.event_type}")
print(f" Error: {event.last_error}")
print(f" Created: {event.created_at}")
5. Handle Idempotency¶
Since the outbox pattern provides at-least-once delivery, consumers must handle duplicate events:
# Consumer side (idempotency key)
@app.post("/events")
async def handle_event(event: CloudEvent):
event_id = event["id"]
# Check if already processed
if await db.event_already_processed(event_id):
return {"status": "duplicate"}
# Process event
await process_order_created(event.data)
# Mark as processed
await db.mark_event_processed(event_id)
Complete Example: E-commerce Order Processing¶
This example demonstrates a real-world e-commerce order processing system using custom database operations with ctx.session.
Scenario¶
- Create order in your database tables (same database as Edda)
- Send "order.created" event to Edda's outbox
- Both operations are atomic (same transaction)
Code¶
from sqlalchemy import Column, String, Float, DateTime
from sqlalchemy.orm import declarative_base
from datetime import datetime
from edda import EddaApp, WorkflowContext, activity, workflow
from edda.outbox.transactional import send_event_transactional
# Your database models (same database as Edda)
Base = declarative_base()
class Order(Base):
__tablename__ = "orders"
order_id = Column(String(255), primary_key=True)
customer_email = Column(String(255), nullable=False)
amount = Column(Float, nullable=False)
status = Column(String(50), nullable=False)
created_at = Column(DateTime, nullable=False, default=datetime.utcnow)
# Edda app (all tables in same database)
edda_app = EddaApp(
service_name="order-service",
db_url="postgresql+asyncpg://user:password@localhost/ecommerce",
outbox_enabled=True,
broker_url="http://localhost:8080/events"
)
# Activity with custom database operations
@activity # Edda automatically manages the transaction
async def create_order(
ctx: WorkflowContext,
order_id: str,
customer_email: str,
amount: float
):
"""Create order with atomic event publishing."""
# Access Edda-managed session (same database)
session = ctx.session
# 1. Save order to YOUR table (same database as Edda)
order = Order(
order_id=order_id,
customer_email=customer_email,
amount=amount,
status="pending"
)
session.add(order)
# 2. Publish event to Edda's outbox (SAME transaction)
await send_event_transactional(
ctx,
event_type="order.created",
event_source="order-service",
event_data={
"order_id": order_id,
"customer_email": customer_email,
"amount": amount,
"status": "pending"
}
)
# 3. Edda automatically commits (or rolls back on exception)
# If event publishing fails, order creation rolls back
# If order creation fails, event publishing rolls back
return {"order_id": order_id, "status": "created"}
# Workflow
@workflow
async def order_workflow(
ctx: WorkflowContext,
order_id: str,
customer_email: str,
amount: float
):
"""Order processing workflow."""
# Create order (atomic with event)
result = await create_order(ctx, order_id, customer_email, amount)
return result
# Run
async def main():
await edda_app.initialize()
# Start workflow
instance_id = await order_workflow.start(
order_id="ORD-12345",
customer_email="customer@example.com",
amount=149.99
)
print(f"Order created: {instance_id}")
# Event will be published by Outbox Relayer
await asyncio.sleep(2)
await edda_app.shutdown()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Database Requirements for Custom Database Operations¶
When using ctx.session to access Edda's managed session for custom database operations, your application tables must be in the same database as Edda's tables:
PostgreSQL:
- Requirement: Your tables and Edda's tables must be in the same PostgreSQL database (not just the same server/instance)
- Reason: PostgreSQL connections are bound to a single database
- Example: If Edda uses
postgresql+asyncpg://user:pass@localhost/myapp, your tables must also be in themyappdatabase
MySQL:
- Requirement: Your tables and Edda's tables must be in the same MySQL database
- Reason: While MySQL technically allows cross-database transactions, we recommend using the same database for simplicity
- Example: If Edda uses
mysql+aiomysql://user:pass@localhost/myapp, your tables should also be in themyappdatabase
SQLite:
- Requirement: Your tables and Edda's tables must be in the same SQLite file
- Reason: SQLite is file-based; cross-file transactions are not possible
- Example: If Edda uses
sqlite+aiosqlite:///workflow.db, your tables must also be inworkflow.db
Best Practice: Use the same database for both Edda and your application tables. This simplifies deployment and ensures transactional consistency.
Key Points¶
- Database Requirement: See Database Requirements above for PostgreSQL/MySQL/SQLite specifics
- Session Access:
ctx.sessionprovides Edda's managed session - Automatic Management: Edda handles begin/commit/rollback automatically
- Atomic Guarantee: Order creation + event publishing commit together
- Rollback Safety: If any step fails, everything rolls back
What Happens¶
1. Order saved to YOUR database (orders table)
2. Event saved to Edda's outbox (outbox_events table)
3. Both committed atomically (COMMIT)
4. Outbox Relayer publishes event (async, background)
5. Event delivered to broker (at-least-once guarantee)
Complete Example¶
See the complete example at examples/with_outbox.py.
Run the example:
# Set broker URL (or use default)
export BROKER_URL="http://localhost:8080/events"
# Run the workflow
uv run python examples/with_outbox.py
Expected Output:
[Activity] Reserving inventory for order ORD-12345
[Activity] Event 'inventory.reserved' written to outbox
[Activity] Charging payment for order ORD-12345: $149.99
[Activity] Event 'payment.charged' written to outbox
[Activity] Shipping order ORD-12345
[Activity] Event 'order.shipped' written to outbox
[Outbox Relayer] Publishing 3 pending events...
[Outbox Relayer] ✅ Published: inventory.reserved
[Outbox Relayer] ✅ Published: payment.charged
[Outbox Relayer] ✅ Published: order.shipped
What You Learned¶
- ✅ Transactional Outbox Pattern: Solves the dual-write problem
- ✅ send_event_transactional(): Writes events to outbox table
- ✅ ctx.session: Access Edda's managed session for custom database operations
- ✅ Automatic Transaction Management: Edda handles begin/commit/rollback
- ✅ Rollback Safety: All operations roll back on failure
- ✅ Outbox Relayer: Publishes events asynchronously with retry
- ✅ At-Least-Once Delivery: Guaranteed event delivery
- ✅ CloudEvents Support: Standard event format
- ✅ Pydantic Integration: Type-safe event data
Next Steps¶
- Saga Pattern: Combine with compensation
- CloudEvents HTTP Binding: CloudEvents specification compliance
- Hooks: Monitor outbox events with lifecycle hooks