Event Waiting and Timers¶
Edda provides powerful event waiting capabilities that allow workflows to pause and wait for external events or timers without blocking worker processes. This enables efficient resource utilization and event-driven workflow orchestration.
Overview¶
Event waiting in Edda allows workflows to:
- Pause execution and wait for external events
- Release worker processes while waiting (non-blocking)
- Resume automatically when events arrive
- Handle timeouts gracefully
- Wait for specific durations with timers
Key Functions¶
wait_event()¶
Wait for an external event with optional timeout:
from edda import wait_event, WorkflowContext
async def my_workflow(ctx: WorkflowContext):
# Wait for an event (with 5-minute timeout)
event = await wait_event(
ctx,
event_type="payment.completed",
timeout_seconds=300 # Optional timeout
)
# Access event data
payment_id = event.data["payment_id"]
amount = event.data["amount"]
wait_timer()¶
Wait for a specific duration:
from edda import wait_timer, WorkflowContext
async def my_workflow(ctx: WorkflowContext):
# Wait for 60 seconds
await wait_timer(ctx, duration_seconds=60)
# Continue execution after timer expires
await process_timeout()
Detailed API Reference¶
wait_event()¶
async def wait_event(
ctx: WorkflowContext,
event_type: str,
timeout_seconds: int | None = None,
model: type[BaseModel] | None = None
) -> ReceivedEvent:
Parameters¶
ctx: The workflow contextevent_type: CloudEvents type to wait for (e.g., "payment.completed")timeout_seconds: Optional timeout in secondsmodel: Optional Pydantic model for automatic data validation
Returns¶
ReceivedEvent object containing:
data: Event data (dict or Pydantic model if specified)type: CloudEvents typesource: Event sourcesubject: Optional event subjecttime: Event timestampid: Event ID
Example with Pydantic¶
from pydantic import BaseModel
from edda import wait_event, WorkflowContext
class PaymentCompleted(BaseModel):
payment_id: str
amount: float
status: str
@workflow
async def payment_workflow(ctx: WorkflowContext, order_id: str):
# Wait with automatic validation
event = await wait_event(
ctx,
event_type="payment.completed",
timeout_seconds=300,
model=PaymentCompleted # Automatic validation
)
# event.data is now a PaymentCompleted instance
if event.data.status == "success":
return {"order_id": order_id, "payment_id": event.data.payment_id}
wait_timer()¶
async def wait_timer(
ctx: WorkflowContext,
duration_seconds: float,
timer_id: str | None = None
) -> None:
Parameters¶
ctx: The workflow contextduration_seconds: Duration to wait in secondstimer_id: Optional custom timer ID (auto-generated if not provided)
Example¶
from edda import wait_timer, workflow, WorkflowContext
@workflow
async def scheduled_task(ctx: WorkflowContext, task_id: str):
# Execute immediately
await perform_initial_task(ctx, task_id)
# Wait 1 hour
await wait_timer(ctx, duration_seconds=3600)
# Execute after delay
await perform_delayed_task(ctx, task_id)
Common Patterns¶
Payment Processing with Timeout¶
@workflow
async def payment_with_timeout(ctx: WorkflowContext, order_id: str):
"""Process payment with automatic cancellation on timeout."""
# Initiate payment
payment_id = await initiate_payment(ctx, order_id)
try:
# Wait for payment completion (5-minute timeout)
event = await wait_event(
ctx,
event_type="payment.completed",
timeout_seconds=300
)
if event.data["status"] == "success":
await fulfill_order(ctx, order_id)
return {"status": "completed"}
else:
await cancel_order(ctx, order_id)
return {"status": "payment_failed"}
except TimeoutError:
# Timeout occurred
await cancel_payment(ctx, payment_id)
await cancel_order(ctx, order_id)
return {"status": "timeout"}
Approval Workflow¶
@workflow
async def approval_workflow(ctx: WorkflowContext, request_id: str):
"""Wait for manager approval with escalation."""
# Send approval request
await send_approval_request(ctx, request_id, level="manager")
# Wait for manager approval (1 day)
try:
event = await wait_event(
ctx,
event_type="approval.decision",
timeout_seconds=86400 # 24 hours
)
return {"approved": event.data["approved"], "approver": event.data["approver"]}
except TimeoutError:
# Escalate to director
await send_approval_request(ctx, request_id, level="director")
event = await wait_event(
ctx,
event_type="approval.decision",
timeout_seconds=86400
)
return {"approved": event.data["approved"], "approver": event.data["approver"], "escalated": True}
Batch Processing with Delays¶
@workflow
async def batch_processor(ctx: WorkflowContext, batch_id: str, items: list[dict]):
"""Process items in batches with delays to avoid rate limiting."""
results = []
batch_size = 10
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
# Process batch
batch_results = await process_batch(ctx, batch)
results.extend(batch_results)
# Wait between batches (except for last batch)
if i + batch_size < len(items):
await wait_timer(ctx, duration_seconds=5)
return {"batch_id": batch_id, "results": results}
Orchestrating Multiple Services¶
@workflow
async def multi_service_orchestration(ctx: WorkflowContext, request_id: str):
"""Orchestrate multiple async services."""
# Start all services
await trigger_service_a(ctx, request_id)
await trigger_service_b(ctx, request_id)
await trigger_service_c(ctx, request_id)
# Wait for all services to complete
results = {}
# Wait for Service A
event_a = await wait_event(ctx, "service.a.completed", timeout_seconds=600)
results["service_a"] = event_a.data
# Wait for Service B
event_b = await wait_event(ctx, "service.b.completed", timeout_seconds=600)
results["service_b"] = event_b.data
# Wait for Service C
event_c = await wait_event(ctx, "service.c.completed", timeout_seconds=600)
results["service_c"] = event_c.data
# Aggregate and return results
return aggregate_results(results)
Sending Events to Waiting Workflows¶
Using CloudEvents¶
Send CloudEvents to resume waiting workflows:
import httpx
from cloudevents.http import CloudEvent
from cloudevents.conversion import to_structured
# Create CloudEvent
event = CloudEvent({
"type": "payment.completed",
"source": "payment-service",
"data": {
"payment_id": "PAY-123",
"amount": 99.99,
"status": "success"
}
})
# Send to Edda
headers, body = to_structured(event)
async with httpx.AsyncClient() as client:
response = await client.post(
"http://localhost:8001/",
headers=headers,
content=body
)
Using send_event()¶
From within workflows or activities:
from edda import send_event
@activity
async def process_payment(ctx: WorkflowContext, payment_data: dict):
# Process payment...
# Send event to resume waiting workflows
await send_event(
event_type="payment.completed",
event_source="payment-processor",
event_data={
"payment_id": payment_data["id"],
"amount": payment_data["amount"],
"status": "success"
}
)
Best Practices¶
1. Always Set Timeouts¶
Prevent workflows from waiting indefinitely:
# Good: Has timeout
event = await wait_event(ctx, "payment.completed", timeout_seconds=300)
# Bad: No timeout (could wait forever)
event = await wait_event(ctx, "payment.completed")
2. Handle Timeout Gracefully¶
try:
event = await wait_event(ctx, "approval.decision", timeout_seconds=3600)
# Process approval
except TimeoutError:
# Handle timeout scenario
await handle_timeout_scenario(ctx)
3. Use Unique Event Types¶
Avoid event type collisions:
# Good: Specific event types
await wait_event(ctx, "order.payment.completed")
await wait_event(ctx, "subscription.payment.completed")
# Bad: Generic event type
await wait_event(ctx, "completed") # Too generic
4. Include Correlation Data¶
For matching specific events:
@workflow
async def order_workflow(ctx: WorkflowContext, order_id: str):
# Send order_id as correlation data
await initiate_payment(ctx, order_id)
# Wait for event with matching order_id
event = await wait_event(
ctx,
event_type=f"payment.completed.{order_id}",
timeout_seconds=300
)
5. Consider Idempotency¶
Ensure events can be safely redelivered:
@workflow
async def idempotent_workflow(ctx: WorkflowContext, request_id: str):
# Check if already processed
if await is_already_processed(ctx, request_id):
return await get_previous_result(ctx, request_id)
# Wait for event
event = await wait_event(ctx, f"process.{request_id}")
# Process idempotently
result = await process_idempotently(ctx, event.data)
return result
Advanced Topics¶
Multiple Event Types¶
Wait for any of multiple event types:
@workflow
async def multi_event_workflow(ctx: WorkflowContext, order_id: str):
# Currently, wait for specific event type
# For multiple types, use separate wait_event calls or
# implement pattern matching in event type
# Option 1: Sequential checks
try:
event = await wait_event(ctx, "payment.completed", timeout_seconds=60)
except TimeoutError:
event = await wait_event(ctx, "payment.failed", timeout_seconds=60)
# Process based on event type
if event.type == "payment.completed":
return {"status": "success"}
else:
return {"status": "failed"}
Event Filtering¶
Filter events based on data:
@workflow
async def filtered_event_workflow(ctx: WorkflowContext, user_id: str):
# Wait for event specific to this user
event = await wait_event(
ctx,
event_type=f"user.action.{user_id}",
timeout_seconds=300
)
# Additional filtering can be done after receiving
if event.data.get("action") == "purchase":
await process_purchase(ctx, event.data)
Cancellation¶
Workflows waiting for events can be cancelled:
# Via API
POST /cancel/{instance_id}
# Or programmatically
from edda.replay import ReplayEngine
engine = ReplayEngine(storage)
await engine.cancel_workflow(instance_id)
Monitoring and Debugging¶
Viewer UI¶
The Edda Viewer shows waiting workflows:
- Status:
waiting_for_eventorwaiting_for_timer - Event type being waited for
- Timeout information
- Time elapsed
Logging¶
Add logging for debugging:
import logging
logger = logging.getLogger(__name__)
@workflow
async def logged_workflow(ctx: WorkflowContext):
logger.info(f"Waiting for payment event at step {ctx.current_step}")
event = await wait_event(ctx, "payment.completed", timeout_seconds=300)
logger.info(f"Received event: {event.id} with data: {event.data}")
Performance Considerations¶
Worker Release¶
- Waiting workflows don't consume worker resources
- Workers can handle other workflows while waiting
- Scales to thousands of waiting workflows
Event Matching¶
- Event type matching is exact (string comparison)
- Consider using hierarchical event types for flexibility
- Index on event_type for performance
Timer Precision¶
- Timers checked every 10 seconds by default
- Maximum 10-second delay in timer expiration
- Configure check interval for different precision needs
Related Topics¶
- CloudEvents HTTP Binding - CloudEvents integration
- Workflows and Activities - Basic workflow concepts
- Saga Pattern - Compensation and rollback
- Examples: Event Waiting - Practical examples