Event Waiting Example¶
This example demonstrates how workflows can wait for external events without blocking worker processes.
What This Example Shows¶
- ✅
wait_event()for waiting for external events - ✅
wait_timer()for time-based waiting - ✅ Process-releasing behavior (workflow pauses, worker is freed)
- ✅ Event-driven workflow continuation
The Problem¶
Traditional approaches keep the async task in memory while waiting:
# ❌ Bad: Keeps coroutine in memory for 1 hour
await asyncio.sleep(3600) # Workflow state held in RAM unnecessarily
Edda's wait_event() and wait_timer() persist the workflow state to the database and release the memory, allowing the async task to be garbage collected. The worker can then handle other workflows.
Code Overview¶
Wait for External Event¶
from edda import workflow, activity, wait_event, WorkflowContext
@activity
async def start_payment_processing(ctx: WorkflowContext, order_id: str):
"""Initiate payment processing with external service."""
print(f"🔄 Starting payment for order {order_id}")
# Call external payment service API...
return {"payment_id": f"PAY-{order_id}", "status": "pending"}
@workflow
async def payment_workflow(ctx: WorkflowContext, order_id: str):
"""
Payment workflow that waits for external payment completion event.
Note: Activity IDs are auto-generated for sequential execution.
"""
# Step 1: Start payment processing (auto-generated ID: "start_payment_processing:1")
payment = await start_payment_processing(ctx, order_id)
print(f"Payment started: {payment['payment_id']}")
# Step 2: Wait for payment completion event
# Workflow pauses here, worker process is released
print("⏸️ Waiting for payment.completed event...")
event = await wait_event(
ctx,
event_type="payment.completed",
timeout_seconds=300 # 5-minute timeout
)
# Step 3: Process payment result
print(f"✅ Payment completed: {event.data}")
return {"status": "completed", "payment_result": event.data}
Wait for Timer¶
from edda import wait_timer
@workflow
async def order_with_timeout(ctx: WorkflowContext, order_id: str):
"""
Order workflow with payment timeout.
Note: Activity IDs are auto-generated for sequential execution.
"""
# Step 1: Create order (auto-generated ID: "create_order:1")
await create_order(ctx, order_id)
print(f"Order {order_id} created")
# Step 2: Wait 60 seconds for payment
print("⏱️ Waiting 60 seconds for payment...")
await wait_timer(ctx, duration_seconds=60)
# Step 3: Check payment status (auto-generated ID: "check_payment_status:1")
status = await check_payment_status(ctx, order_id)
if status["paid"]:
print("✅ Payment received!")
return {"status": "completed"}
else:
print("❌ Payment timeout - cancelling order")
# Step 4: Cancel order (auto-generated ID: "cancel_order:1")
await cancel_order(ctx, order_id)
return {"status": "cancelled", "reason": "payment_timeout"}
How It Works¶
Event Waiting Flow¶
1. Workflow executes: start_payment_processing()
2. Workflow hits: wait_event()
3. Workflow pauses (status="waiting_for_event")
4. Worker process is RELEASED (can handle other workflows)
5. External event arrives (e.g., CloudEvent)
6. Workflow RESUMES from wait_event()
7. Workflow continues: process payment result
ReceivedEvent Structure¶
from edda.events import ReceivedEvent
event = await wait_event(ctx, "payment.completed")
# event is a ReceivedEvent instance
print(event.type) # "payment.completed"
print(event.source) # "payment-service"
print(event.data) # {"transaction_id": "...", "amount": 99.99}
Type-Safe Events with Pydantic¶
Use Pydantic models for type-safe event data access:
from pydantic import BaseModel
class PaymentCompleted(BaseModel):
order_id: str
transaction_id: str
amount: float
status: str
@workflow
async def payment_workflow_typed(ctx: WorkflowContext, order_id: str):
"""
Payment workflow with type-safe event handling.
"""
# Wait for event with Pydantic model
event = await wait_event(
ctx,
event_type="payment.completed",
model=PaymentCompleted # Type-safe conversion
)
# Type-safe access with IDE completion
amount = event.data.amount # ✅ Type-safe (float)
transaction_id = event.data.transaction_id # ✅ Type-safe (str)
order_id = event.data.order_id # ✅ Type-safe (str)
print(f"✅ Payment of ${amount} completed for order {order_id}")
print(f" Transaction ID: {transaction_id}")
return {"status": "completed", "amount": amount}
Benefits of Pydantic models: - ✅ Type safety: IDE autocomplete and mypy validation - ✅ Runtime validation: Automatic data validation when event arrives - ✅ Clear contracts: Explicit event structure definition - ✅ Error detection: Invalid events fail fast with clear error messages
Without Pydantic (dict access):
event = await wait_event(ctx, "payment.completed")
amount = event.data["amount"] # ⚠️ No type checking, typo possible
With Pydantic (model access):
event = await wait_event(ctx, "payment.completed", model=PaymentCompleted)
amount = event.data.amount # ✅ Type-safe, IDE autocomplete
Benefits¶
1. Resource Efficiency¶
# ❌ Bad: Keeps workflow state in memory for 1 hour
@workflow
async def bad_workflow(ctx: WorkflowContext):
await asyncio.sleep(3600) # Task held in RAM!
# ✅ Good: Persists state and releases memory
@workflow
async def good_workflow(ctx: WorkflowContext):
await wait_timer(ctx, duration_seconds=3600) # Memory freed!
Impact:
- Bad: 1 worker holds 1 workflow state in RAM (wasted memory)
- Good: 1 worker can handle 1000s of workflows (state persisted to DB)
2. Long-Running Workflows¶
Perfect for workflows that span hours or days:
@workflow
async def loan_approval_workflow(ctx: WorkflowContext, application_id: str):
# Submit for manual review
await submit_for_review(ctx, application_id)
# Wait up to 48 hours for approval
event = await wait_event(
ctx,
event_type="loan.approved",
timeout_seconds=48 * 3600
)
# Process approval
await process_approval(ctx, event.data)
3. Event-Driven Architecture¶
Integrate with event-driven systems:
@workflow
async def order_fulfillment(ctx: WorkflowContext, order_id: str):
# Wait for warehouse to pack the order
pack_event = await wait_event(ctx, "order.packed")
# Wait for carrier to pick up
pickup_event = await wait_event(ctx, "order.picked_up")
# Wait for delivery confirmation
delivery_event = await wait_event(ctx, "order.delivered")
return {"status": "delivered"}
Sending Events¶
To resume a waiting workflow, send a CloudEvent:
# Using curl
curl -X POST http://localhost:8001/events \
-H "Content-Type: application/cloudevents+json" \
-d '{
"specversion": "1.0",
"type": "payment.completed",
"source": "payment-service",
"id": "event-123",
"datacontenttype": "application/json",
"data": {
"order_id": "ORD-123",
"transaction_id": "TXN-456",
"amount": 99.99,
"status": "success"
}
}'
Expected Response:
Status Codes:
- 202 Accepted: Event accepted for processing ✅
- 400 Bad Request: Invalid CloudEvent format (non-retryable) ❌
- 500 Internal Server Error: Server error (retryable) ⚠️
See CloudEvents HTTP Binding for detailed error handling and retry logic.
Or programmatically:
from edda.events import send_event
await send_event(
event_type="payment.completed",
source="payment-service",
data={
"order_id": "ORD-123",
"transaction_id": "TXN-456",
"amount": 99.99,
"status": "success"
}
)
Running the Example¶
Create a file named event_waiting_workflow.py with the code shown above, then run:
# Install Edda if you haven't already
uv add edda-framework
# Run your workflow
uv run python event_waiting_workflow.py
Complete Code¶
See the full implementation in examples/event_waiting_workflow.py.
What You Learned¶
- ✅
wait_event(): Wait for external events - ✅
wait_timer(): Wait for specific duration - ✅ Process Releasing: Workers are freed during wait
- ✅ ReceivedEvent: Typed event data access
- ✅ CloudEvents: Standard event format support
Next Steps¶
- CloudEvents HTTP Binding: Deep dive into CloudEvents integration
- Core Concepts: Learn about workflows, activities, and events
- Transactional Outbox: Reliable event publishing