API Reference¶
Core¶
EddaApp¶
ASGI/WSGI compatible workflow application with distributed execution support.
This is the main entry point for the Edda framework. It handles: - CloudEvents HTTP endpoint - Event routing and workflow triggering - Distributed locking and coordination - Storage management
__call__(scope, receive, send)
async
¶
ASGI interface.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scope
|
|
ASGI scope dictionary |
required |
receive
|
|
Async function to receive messages |
required |
send
|
|
Async function to send messages |
required |
__init__(service_name, db_url, outbox_enabled=False, broker_url=None, hooks=None, default_retry_policy=None, message_retention_days=7, pool_size=5, max_overflow=10, pool_timeout=30, pool_recycle=3600, pool_pre_ping=True, use_listen_notify=None, notify_fallback_interval=30, max_workflows_per_batch=10, leader_heartbeat_interval=15, leader_lease_duration=45)
¶
Initialize Edda application.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
|
Service name for distributed execution (e.g., "order-service") |
required |
db_url
|
|
Database URL (e.g., "sqlite:///workflow.db") |
required |
outbox_enabled
|
|
Enable transactional outbox pattern |
False
|
broker_url
|
|
Broker URL for outbox publishing. Required if outbox_enabled=True. |
None
|
hooks
|
|
Optional WorkflowHooks implementation for observability |
None
|
default_retry_policy
|
|
Default retry policy for all activities. If None, uses DEFAULT_RETRY_POLICY (5 attempts, exponential backoff). Can be overridden per-activity using @activity(retry_policy=...). |
None
|
message_retention_days
|
|
Number of days to retain channel messages before automatic cleanup. Defaults to 7 days. Messages older than this will be deleted by a background task running every hour. |
7
|
pool_size
|
|
Number of connections to keep open in the pool (default: 5). Ignored for SQLite. For production, consider 20+. |
5
|
max_overflow
|
|
Maximum number of connections to create above pool_size (default: 10). Ignored for SQLite. For production, consider 40+. |
10
|
pool_timeout
|
|
Seconds to wait for a connection from the pool (default: 30). Ignored for SQLite. |
30
|
pool_recycle
|
|
Seconds before a connection is recycled (default: 3600). Helps prevent stale connections. Ignored for SQLite. |
3600
|
pool_pre_ping
|
|
If True, test connections before use (default: True). Helps detect disconnected connections. Ignored for SQLite. |
True
|
use_listen_notify
|
|
Enable PostgreSQL LISTEN/NOTIFY for instant notifications. None (default) = auto-detect (enabled for PostgreSQL, disabled for others). True = force enable (raises error if not PostgreSQL). False = force disable (use polling only). |
None
|
notify_fallback_interval
|
|
Polling interval in seconds when NOTIFY is enabled. Used as backup for missed notifications. Default: 30 seconds. SQLite/MySQL always use their default polling intervals. |
30
|
max_workflows_per_batch
|
|
Maximum workflows to process per resume cycle. - int: Fixed batch size (default: 10) - "auto": Scale 10-100 based on queue depth - "auto:cpu": Scale 10-100 based on CPU utilization (requires psutil) |
10
|
leader_heartbeat_interval
|
|
Interval in seconds for leader heartbeat (default: 15). Controls how often workers attempt to become/maintain leadership. |
15
|
leader_lease_duration
|
|
Duration in seconds for leader lease (default: 45). If leader fails to heartbeat within this time, another worker takes over. |
45
|
find_instances(*, input_filters=None, status=None, workflow_name=None, instance_id=None, started_after=None, started_before=None, limit=50, page_token=None)
async
¶
Find workflow instances with filtering support.
This is a high-level API for querying workflow instances by various criteria, including input parameter values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_filters
|
|
Filter by input data values. Keys are JSON paths, values are expected values (exact match). Example: {"order_id": "ORD-123"} |
None
|
status
|
|
Filter by workflow status (e.g., "running", "completed") |
None
|
workflow_name
|
|
Filter by workflow name (partial match, case-insensitive) |
None
|
instance_id
|
|
Filter by instance ID (partial match, case-insensitive) |
None
|
started_after
|
|
Filter instances started after this datetime (inclusive) |
None
|
started_before
|
|
Filter instances started before this datetime (inclusive) |
None
|
limit
|
|
Maximum number of instances to return per page (default: 50) |
50
|
page_token
|
|
Cursor for pagination (from previous response) |
None
|
Returns:
| Type | Description |
|---|---|
|
Dictionary containing: |
|
|
|
|
|
|
Example
Find all instances with order_id = "ORD-123"¶
result = await app.find_instances(input_filters={"order_id": "ORD-123"}) for instance in result["instances"]: ... print(f"{instance['instance_id']}: {instance['status']}")
Find running instances with specific customer¶
result = await app.find_instances( ... input_filters={"customer_id": "CUST-456"}, ... status="running" ... )
handle_cloudevent(event, wait=False)
async
¶
Handle incoming CloudEvent.
This will route the event to registered handlers and deliver events to waiting workflows.
By default, handlers are executed as background tasks to avoid blocking the HTTP response. Set wait=True for synchronous execution (useful for testing).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
|
CloudEvent instance |
required |
wait
|
|
If True, wait for handlers to complete before returning. If False (default), execute handlers as background tasks. |
False
|
initialize()
async
¶
Initialize the application.
This should be called before the app starts receiving requests.
on_event(event_type, proto_type=None)
¶
Decorator to register an event handler.
Example
@app.on_event("order.created") ... async def handle_order_created(event): ... await order_workflow.start(...)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_type
|
|
CloudEvent type to handle |
required |
proto_type
|
|
Optional protobuf message type |
None
|
Returns:
| Type | Description |
|---|---|
|
Decorator function |
shutdown()
async
¶
Shutdown the application and cleanup resources.
This should be called when the app is shutting down.
workflow¶
Workflow module for Edda framework.
This module provides the @workflow decorator for defining workflow functions and managing workflow instances.
RecurException
¶
Bases:
Exception raised to signal that a workflow should recur (restart with fresh history).
This is similar to Erlang's tail recursion pattern - it prevents unbounded history growth in long-running loops by completing the current workflow instance and starting a new one with the provided arguments.
The workflow's history is archived (not deleted) and a new instance is created with a reference to the previous instance (continued_from).
Note
This exception should not be caught by user code. It is handled internally by the ReplayEngine.
Example
@workflow ... async def notification_service(ctx: WorkflowContext, processed_count: int = 0): ... await join_group(ctx, group="order_watchers") ... ... count = 0 ... while True: ... msg = await wait_message(ctx, channel="order.completed") ... await send_notification(ctx, msg.data, activity_id=f"notify:{msg.id}") ... ... count += 1 ... if count >= 1000: ... # Reset history by recurring with new state ... await ctx.recur(processed_count=processed_count + count)
__init__(kwargs)
¶
Initialize RecurException.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
kwargs
|
|
Keyword arguments to pass to the new workflow instance |
required |
Workflow
¶
Wrapper class for workflow functions.
Provides methods for starting and managing workflow instances.
__call__(*args, **kwargs)
async
¶
Direct call to the workflow function.
This is typically used during replay by the replay engine.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*args
|
|
Positional arguments |
()
|
**kwargs
|
|
Keyword arguments |
{}
|
Returns:
| Type | Description |
|---|---|
|
Workflow result |
__init__(func, event_handler=False, lock_timeout_seconds=None)
¶
Initialize workflow wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
|
The async function to wrap as a workflow |
required |
event_handler
|
|
Whether to auto-register as CloudEvent handler |
False
|
lock_timeout_seconds
|
|
Default lock timeout for this workflow (None = global default 300s) |
None
|
resume(instance_id, event=None)
async
¶
Resume an existing workflow instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instance_id
|
|
Workflow instance ID |
required |
event
|
|
Optional event that triggered the resume |
None
|
Raises:
| Type | Description |
|---|---|
|
If replay engine not initialized |
start(lock_timeout_seconds=None, **kwargs)
async
¶
Start a new workflow instance.
Pydantic models in kwargs are automatically converted to JSON-compatible dicts for storage. During execution, they will be restored back to Pydantic models based on the workflow function's type hints.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
lock_timeout_seconds
|
|
Override lock timeout for this specific execution (None = use decorator default or global default 300s) |
None
|
**kwargs
|
|
Input parameters for the workflow (can include Pydantic models) |
{}
|
Returns:
| Type | Description |
|---|---|
|
Instance ID of the started workflow |
Raises:
| Type | Description |
|---|---|
|
If replay engine not initialized |
get_all_workflows()
¶
Get all registered workflow definitions.
Returns:
| Type | Description |
|---|---|
|
Dictionary mapping workflow names to Workflow instances |
set_replay_engine(engine)
¶
Set the global replay engine.
This is called by EddaApp during initialization.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
engine
|
|
ReplayEngine instance |
required |
workflow(func=None, *, event_handler=False, lock_timeout_seconds=None)
¶
Decorator for defining workflows.
Workflows are the top-level orchestration functions that coordinate multiple activities. They support deterministic replay and can wait for external events.
By default, workflows are NOT automatically registered as CloudEvent handlers. Set event_handler=True to enable automatic CloudEvent handling.
Example
Basic workflow (manual event handling)¶
@workflow ... async def order_workflow(ctx: WorkflowContext, order_id: str, amount: int): ... inventory = await reserve_inventory(ctx, order_id) ... payment = await process_payment(ctx, order_id, amount) ... return {"status": "completed"} ... ... # Start the workflow manually ... instance_id = await order_workflow.start(order_id="123", amount=100) ...
Workflow with automatic CloudEvent handling¶
@workflow(event_handler=True) ... async def auto_workflow(ctx: WorkflowContext, **kwargs): ... # This will automatically handle CloudEvents with type="auto_workflow" ... pass ...
Workflow with custom lock timeout¶
@workflow(lock_timeout_seconds=600) ... async def long_running_workflow(ctx: WorkflowContext, **kwargs): ... # This workflow will use a 10-minute lock timeout instead of the default 5 minutes ... pass
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
|
Async function to wrap as a workflow |
None
|
event_handler
|
|
If True, automatically register as CloudEvent handler |
False
|
lock_timeout_seconds
|
|
Default lock timeout for this workflow (None = global default 300s) |
None
|
Returns:
| Type | Description |
|---|---|
|
Decorated Workflow instance |
activity¶
Activity module for Edda framework.
This module provides the @activity decorator for defining atomic units of work within workflows. Activities are the building blocks of Sagas and support deterministic replay through result caching.
Activity
¶
Wrapper class for activity functions.
Handles execution, result caching during replay, and history recording. Supports automatic retry with exponential backoff.
__call__(ctx, *args, **kwargs)
async
¶
Execute the activity with automatic retry.
During replay, returns cached result. During normal execution, executes the function with retry logic and records the result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
|
Workflow context |
required |
*args
|
|
Positional arguments for the activity |
()
|
**kwargs
|
|
Keyword arguments for the activity Optional: activity_id (str) - Explicit activity ID - Auto-generated by default (format: "{function_name}:{counter}") - Manual specification required ONLY for concurrent execution (asyncio.gather, async for, etc.) - For sequential execution, rely on auto-generation |
{}
|
Returns:
| Type | Description |
|---|---|
|
Activity result |
Raises:
| Type | Description |
|---|---|
|
When all retry attempts are exhausted |
|
For non-retryable errors |
|
When workflow is cancelled |
Example
Sequential execution (auto-generated IDs - recommended)::
result1 = await my_activity(ctx, arg1) # Auto: "my_activity:1"
result2 = await my_activity(ctx, arg2) # Auto: "my_activity:2"
Concurrent execution (manual IDs - required)::
results = await asyncio.gather(
my_activity(ctx, arg1, activity_id="my_activity:1"),
my_activity(ctx, arg2, activity_id="my_activity:2"),
)
__init__(func, retry_policy=None)
¶
Initialize activity wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
|
The async or sync function to wrap |
required |
retry_policy
|
|
Optional retry policy for this activity. If None, uses the default policy from EddaApp. |
None
|
activity(func=None, *, retry_policy=None)
¶
Decorator for defining activities (atomic units of work) with automatic retry.
Activities can be async or sync functions that take a WorkflowContext as the first parameter, followed by any other parameters. Sync functions are executed in a thread pool to avoid blocking the event loop.
Activities are automatically wrapped in a transaction, ensuring that activity execution, history recording, and event sending are atomic. Each retry attempt is executed in an independent transaction.
When using ctx.session to access the Edda-managed session, all operations (activity execution, history recording, event sending) use that shared session, ensuring atomicity within a single transaction.
For non-idempotent operations (e.g., external API calls), place them in Activities to leverage result caching during replay. For operations that can be safely re-executed during replay, place them directly in the Workflow function.
Example
@activity # Sync activity (no async/await) ... def reserve_inventory(ctx: WorkflowContext, order_id: str) -> dict: ... # Your business logic here (executed in thread pool) ... return {"reservation_id": "123"}
@activity # Async activity (recommended for I/O-bound operations) ... async def reserve_inventory_async(ctx: WorkflowContext, order_id: str) -> dict: ... # Async I/O operations ... return {"reservation_id": "123"}
from edda.retry import RetryPolicy, AGGRESSIVE_RETRY @activity(retry_policy=AGGRESSIVE_RETRY) # Custom retry policy ... def process_payment(ctx: WorkflowContext, amount: float) -> dict: ... # Fast retries for low-latency services ... return {"status": "completed"}
@activity # Non-idempotent operations cached during replay ... def charge_credit_card(ctx: WorkflowContext, amount: float) -> dict: ... # External API call - result is cached, won't be called again on replay ... # If this fails, automatic retry with exponential backoff ... return {"transaction_id": "txn_123"}
from edda.exceptions import TerminalError @activity ... def validate_user(ctx: WorkflowContext, user_id: str) -> dict: ... user = fetch_user(user_id) # No await needed for sync ... if not user: ... # Don't retry - user doesn't exist ... raise TerminalError(f"User {user_id} not found") ... return {"user_id": user_id, "name": user.name}
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
|
Async or sync function to wrap as an activity |
None
|
retry_policy
|
|
Optional retry policy for this activity. If None, uses the default policy from EddaApp. |
None
|
Returns:
| Type | Description |
|---|---|
|
Decorated function that can be called within a workflow |
Raises:
| Type | Description |
|---|---|
|
When all retry attempts are exhausted |
|
For non-retryable errors (no retry attempted) |
Sync activities are executed in a thread pool. For I/O-bound operations (database queries, HTTP requests, etc.), async activities are recommended for better performance.
WorkflowContext¶
Context for workflow execution.
Provides access to workflow instance metadata, storage, history management, and utilities for deterministic replay.
This context is passed to activities and contains all the information needed for execution and replay.
session
property
¶
Get Edda-managed database session for custom database operations.
This property provides access to the current transaction's SQLAlchemy session, allowing you to execute custom database operations (ORM queries, raw SQL, etc.) within the same transaction as Edda's workflow operations.
The session is automatically managed by Edda: - Commit/rollback happens automatically at the end of @activity - All operations are atomic (workflow history + your DB operations) - Transaction safety is guaranteed
Returns:
| Type | Description |
|---|---|
|
AsyncSession managed by Edda's transaction context |
Raises:
| Type | Description |
|---|---|
|
If not inside a transaction (must use @activity or ctx.transaction()) |
Example
@activity async def create_order(ctx: WorkflowContext, order_id: str, amount: float): # Get Edda-managed session session = ctx.session
# Your business logic (same DB as Edda)
order = Order(order_id=order_id, amount=amount)
session.add(order)
# Event publishing (same transaction)
await send_event_transactional(
ctx, "order.created", "order-service",
{"order_id": order_id, "amount": amount}
)
# Edda commits automatically (or rolls back on error)
return {"order_id": order_id, "status": "created"}
Note
- Requires @activity (default) or async with ctx.transaction()
- All operations commit/rollback together atomically
- Your tables must be in the same database as Edda
- Do NOT call session.commit() or session.rollback() manually
storage
property
¶
Get storage backend (internal use only).
Warning
This property is for framework internal use only. Direct storage access may break deterministic replay guarantees. Use WorkflowContext methods instead (transaction(), in_transaction()).
__init__(instance_id, workflow_name, storage, worker_id, is_replaying=False, hooks=None)
¶
Initialize workflow context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instance_id
|
|
Workflow instance ID |
required |
workflow_name
|
|
Name of the workflow |
required |
storage
|
|
Storage backend |
required |
worker_id
|
|
Worker ID holding the lock |
required |
is_replaying
|
|
Whether this is a replay execution |
False
|
hooks
|
|
Optional WorkflowHooks implementation for observability |
None
|
__repr__()
¶
String representation of the context.
in_transaction()
¶
Check if currently in a transaction.
This method is useful for ensuring that transactional operations (like send_event_transactional) are called within a transaction context.
Returns:
| Type | Description |
|---|---|
|
True if inside a transaction context, False otherwise |
Example
if ctx.in_transaction(): await send_event_transactional(ctx, "order.created", ...) else: logger.warning("Not in transaction, using outbox pattern") await send_event_transactional(ctx, "order.created", ...)
recur(**kwargs)
async
¶
Restart the workflow with fresh history (Erlang-style tail recursion).
This method prevents unbounded history growth in long-running loops by:
1. Completing the current workflow instance (marking as "recurred")
2. Archiving the current history (not deleted)
3. Starting a new workflow instance with the provided arguments
4. Linking the new instance to the old one via continued_from
This is similar to Erlang's tail recursion pattern where calling the same
function at the end of a loop prevents stack growth. In Edda, recur()
prevents history growth.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
|
Arguments to pass to the new workflow instance. These become the input parameters for the next iteration. |
{}
|
Raises:
| Type | Description |
|---|---|
|
Always raised to signal the ReplayEngine to handle the recur operation. This exception should not be caught. |
Example
@workflow ... async def notification_service(ctx: WorkflowContext, processed_count: int = 0): ... await join_group(ctx, group="order_watchers") ... ... count = 0 ... while True: ... msg = await wait_message(ctx, channel="order.completed") ... await send_notification(ctx, msg.data, activity_id=f"notify:{msg.id}") ... ... count += 1 ... if count >= 1000: ... # Reset history every 1000 iterations ... await ctx.recur(processed_count=processed_count + count) ... # Code after recur() is never executed
Note
- Group memberships are NOT automatically transferred. You must re-join groups in the new iteration if needed.
- The old workflow's history is archived, not deleted.
- The new instance has a
continued_fromfield pointing to the old instance. - During replay, if recur() was already called, this raises immediately without re-executing previous activities.
register_post_commit(callback)
¶
Register a callback to be executed after the current transaction commits.
The callback will be executed after the top-level transaction commits successfully. If the transaction is rolled back, the callback will NOT be executed. This is useful for deferring side effects (like message delivery) until after the transaction has been committed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
|
An async function to call after commit. |
required |
Raises:
| Type | Description |
|---|---|
|
If not in a transaction. |
Example
async with ctx.transaction(): # Save order to database await ctx.storage.append_history(...)
# Defer message delivery until after commit
async def deliver_notifications():
await notify_subscribers(order_id)
ctx.register_post_commit(deliver_notifications)
transaction()
async
¶
Create a transactional context for atomic operations.
This context manager allows you to execute multiple storage operations within a single database transaction. All operations will be committed together, or rolled back together if an exception occurs.
Example
async with ctx.transaction(): # All operations here are in the same transaction await ctx.storage.append_history(...) await send_event_transactional(ctx, ...) # If any operation fails, all changes are rolled back
Yields:
| Type | Description |
|---|---|
|
None |
Raises:
| Type | Description |
|---|---|
|
If any operation within the transaction fails, the transaction is rolled back and the exception is re-raised |
Instance Search¶
find_instances¶
Find workflow instances with filtering support.
This is a high-level API for querying workflow instances by various criteria, including input parameter values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_filters
|
|
Filter by input data values. Keys are JSON paths, values are expected values (exact match). Example: {"order_id": "ORD-123"} |
None
|
status
|
|
Filter by workflow status (e.g., "running", "completed") |
None
|
workflow_name
|
|
Filter by workflow name (partial match, case-insensitive) |
None
|
instance_id
|
|
Filter by instance ID (partial match, case-insensitive) |
None
|
started_after
|
|
Filter instances started after this datetime (inclusive) |
None
|
started_before
|
|
Filter instances started before this datetime (inclusive) |
None
|
limit
|
|
Maximum number of instances to return per page (default: 50) |
50
|
page_token
|
|
Cursor for pagination (from previous response) |
None
|
Returns:
| Type | Description |
|---|---|
|
Dictionary containing: |
|
|
|
|
|
|
Example
Find all instances with order_id = "ORD-123"¶
result = await app.find_instances(input_filters={"order_id": "ORD-123"}) for instance in result["instances"]: ... print(f"{instance['instance_id']}: {instance['status']}")
Find running instances with specific customer¶
result = await app.find_instances( ... input_filters={"customer_id": "CUST-456"}, ... status="running" ... )
Timer Functions¶
sleep¶
Pause workflow execution for a specified duration.
This is a durable sleep - the workflow will be resumed after the specified time even if the worker restarts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
|
Workflow context |
required |
seconds
|
|
Duration to sleep in seconds |
required |
timer_id
|
|
Optional unique ID for this timer (auto-generated if not provided) |
None
|
Example
@workflow ... async def order_workflow(ctx: WorkflowContext, order_id: str): ... await create_order(ctx, order_id, activity_id="create:1") ... await sleep(ctx, 60) # Wait 60 seconds for payment ... await check_payment(ctx, order_id, activity_id="check:1")
sleep_until¶
Pause workflow execution until a specific time.
This is a durable sleep - the workflow will be resumed at the specified time even if the worker restarts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
|
Workflow context |
required |
target_time
|
|
Absolute time to wake up (must be timezone-aware) |
required |
timer_id
|
|
Optional unique ID for this timer (auto-generated if not provided) |
None
|
Example
from datetime import datetime, timedelta, UTC
@workflow ... async def scheduled_report(ctx: WorkflowContext, report_id: str): ... # Schedule for tomorrow at 9 AM ... tomorrow_9am = datetime.now(UTC).replace(hour=9, minute=0, second=0) ... tomorrow_9am += timedelta(days=1) ... await sleep_until(ctx, tomorrow_9am) ... await generate_report(ctx, report_id, activity_id="generate:1")
Events¶
wait_event¶
Wait for a CloudEvent to arrive.
This function pauses the workflow execution until a matching CloudEvent is received. During replay, it returns the cached event data and metadata.
Internally, this uses the Channel-based Message Queue with event_type as the channel name. CloudEvents metadata is preserved in the message metadata.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
|
Workflow context |
required |
event_type
|
|
CloudEvent type to wait for (e.g., "payment.completed") |
required |
timeout_seconds
|
|
Optional timeout in seconds |
None
|
model
|
|
Optional Pydantic model class to convert event data to |
None
|
event_id
|
|
Optional event identifier (auto-generated if not provided) |
None
|
Returns:
| Type | Description |
|---|---|
|
ReceivedEvent object containing event data and CloudEvents metadata. |
|
If model is provided, ReceivedEvent.data will be a Pydantic model instance. |
Note
Events are delivered to workflows that are subscribed to the event_type channel. Use subscribe(ctx, event_type) before calling wait_event() or let it auto-subscribe.
Raises:
| Type | Description |
|---|---|
|
During normal execution to pause the workflow |
|
If timeout is reached |
Example
Without Pydantic (dict access)¶
@workflow ... async def order_workflow(ctx: WorkflowContext, order_id: str): ... await subscribe(ctx, "payment.completed", mode="broadcast") ... payment_event = await wait_event(ctx, "payment.completed") ... amount = payment_event.data["amount"] ... order_id = payment_event.data["order_id"] ...
With Pydantic (type-safe access)¶
@workflow ... async def order_workflow_typed(ctx: WorkflowContext, order_id: str): ... await subscribe(ctx, "payment.completed", mode="broadcast") ... payment_event = await wait_event( ... ctx, ... event_type="payment.completed", ... model=PaymentCompleted ... ) ... # Type-safe access with IDE completion ... amount = payment_event.data.amount
send_event¶
Send a CloudEvent to Knative Broker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_type
|
|
CloudEvent type (e.g., "order.created") |
required |
source
|
|
CloudEvent source (e.g., "order-service") |
required |
data
|
|
Event payload (JSON dict or Pydantic model) |
required |
broker_url
|
|
Knative Broker URL |
'http://broker-ingress.knative-eventing.svc.cluster.local'
|
datacontenttype
|
|
Content type (defaults to "application/json") |
None
|
Raises:
| Type | Description |
|---|---|
|
If the HTTP request fails |
ReceivedEvent¶
Represents a CloudEvent received by a workflow.
This class provides structured access to both the event payload (data) and CloudEvents metadata (type, source, id, time, etc.).
Attributes:
| Name | Type | Description |
|---|---|---|
|
|
The event payload (JSON dict or Pydantic model) |
|
|
CloudEvent type (e.g., "payment.completed") |
|
|
CloudEvent source (e.g., "payment-service") |
|
|
Unique event identifier |
|
|
Event timestamp (ISO 8601 format) |
|
|
Content type of the data (typically "application/json") |
|
|
Subject of the event (optional CloudEvents extension) |
|
|
Additional CloudEvents extension attributes |
Example
Without Pydantic model¶
event = await wait_event(ctx, "payment.completed") amount = event.data["amount"] order_id = event.data["order_id"]
With Pydantic model (type-safe)¶
event = await wait_event(ctx, "payment.completed", model=PaymentCompleted) amount = event.data.amount # Type-safe access order_id = event.data.order_id # IDE completion
Access CloudEvents metadata¶
event_source = event.source event_time = event.time event_id = event.id
EventTimeoutError¶
Bases:
Exception raised when wait_event() times out.
This exception is raised when an event does not arrive within the specified timeout period. The workflow can catch this exception to handle timeout scenarios gracefully.
Example
try: event = await wait_event(ctx, "payment.completed", timeout_seconds=60) except EventTimeoutError: # Handle timeout - maybe send reminder or cancel order await send_notification("Payment timeout")
Channel-based Messaging¶
subscribe¶
Subscribe to a channel for receiving messages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
|
Workflow context |
required |
channel
|
|
Channel name to subscribe to |
required |
mode
|
|
Subscription mode: - "broadcast": All subscribers receive all messages (fan-out pattern) - "competing": Each message goes to only one subscriber (work queue pattern) - "direct": Receive messages sent via send_to() to this instance |
'broadcast'
|
Raises:
| Type | Description |
|---|---|
|
If the channel is already configured with a different mode |
|
If mode is not 'broadcast', 'competing', or 'direct' |
The "direct" mode is syntactic sugar that subscribes to "channel:instance_id" internally, allowing simpler code when receiving direct messages:
# Instead of this:
direct_channel = f"notifications:{ctx.instance_id}"
await subscribe(ctx, direct_channel, mode="broadcast")
msg = await receive(ctx, direct_channel)
# You can write:
await subscribe(ctx, "notifications", mode="direct")
msg = await receive(ctx, "notifications")
Example
@workflow ... async def event_handler(ctx: WorkflowContext, id: str): ... # Subscribe to order events (all handlers receive all events) ... await subscribe(ctx, "order.events", mode="broadcast") ... ... while True: ... event = await receive(ctx, "order.events") ... await handle_event(ctx, event.data, activity_id=f"handle:{event.id}") ... await ctx.recur()
@workflow ... async def job_worker(ctx: WorkflowContext, worker_id: str): ... # Subscribe to job queue (each job processed by one worker) ... await subscribe(ctx, "jobs", mode="competing") ... ... while True: ... job = await receive(ctx, "jobs") ... await execute_job(ctx, job.data, activity_id=f"job:{job.id}") ... await ctx.recur()
@workflow ... async def direct_receiver(ctx: WorkflowContext, id: str): ... # Subscribe to receive direct messages via send_to() ... await subscribe(ctx, "notifications", mode="direct") ... ... msg = await receive(ctx, "notifications") ... print(f"Received: {msg.data}")
unsubscribe¶
Unsubscribe from a channel.
Note: Workflows are automatically unsubscribed from all channels when they complete, fail, or are cancelled. Explicit unsubscribe is usually not necessary.
For channels subscribed with mode="direct", use the original channel name (not the transformed "channel:instance_id" form).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
|
Workflow context |
required |
channel
|
|
Channel name to unsubscribe from |
required |
receive¶
Receive a message from a channel.
This function blocks (pauses the workflow) until a message is available on the channel. Messages are queued persistently, so messages published before this function is called will still be received.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
|
Workflow context |
required |
channel
|
|
Channel name to receive from |
required |
timeout_seconds
|
|
Optional timeout in seconds |
None
|
message_id
|
|
Optional ID for concurrent waiting (auto-generated if not provided) |
None
|
Returns:
| Type | Description |
|---|---|
|
ChannelMessage object containing data and metadata |
Raises:
| Type | Description |
|---|---|
|
Raised to pause workflow (caught by ReplayEngine) |
|
If timeout expires before message arrives |
Example
@workflow ... async def consumer(ctx: WorkflowContext, id: str): ... await subscribe(ctx, "tasks", mode="competing") ... ... while True: ... msg = await receive(ctx, "tasks") ... await process(ctx, msg.data, activity_id=f"process:{msg.id}") ... await ctx.recur()
publish¶
Publish a message to a channel.
Can be called from within a workflow (with WorkflowContext) or from external code (with StorageProtocol directly).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx_or_storage
|
|
Workflow context or storage backend |
required |
channel
|
|
Channel name to publish to |
required |
data
|
|
Message payload (dict or bytes) |
required |
metadata
|
|
Optional metadata |
None
|
target_instance_id
|
|
If provided, only deliver to this specific instance (Point-to-Point delivery). If None, deliver to all waiting subscribers (Pub/Sub delivery). |
None
|
worker_id
|
|
Optional worker ID for Lock-First pattern (required for CloudEvents HTTP handler) |
None
|
Returns:
| Type | Description |
|---|---|
|
Message ID of the published message |
Example
From within a workflow¶
@workflow ... async def order_processor(ctx: WorkflowContext, order_id: str): ... result = await process_order(ctx, order_id, activity_id="process:1") ... await publish(ctx, "order.completed", {"order_id": order_id}) ... return result
From external code (e.g., HTTP handler)¶
async def api_handler(request): ... message_id = await publish(app.storage, "jobs", {"task": "process"}) ... return {"message_id": message_id}
Point-to-Point delivery (CloudEvents with eddainstanceid)¶
await publish( ... storage, "payment.completed", {"amount": 100}, ... target_instance_id="order-123", worker_id="worker-1" ... )
send_to¶
Send a message directly to a specific workflow instance.
This is useful for workflow-to-workflow communication where the target instance ID is known.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
|
Workflow context (source workflow) |
required |
instance_id
|
|
Target workflow instance ID |
required |
channel
|
|
Channel name (defaults to "direct" for direct messages) |
'__direct__'
|
data
|
|
Message payload |
required |
metadata
|
|
Optional metadata |
None
|
Returns:
| Type | Description |
|---|---|
|
True if delivered, False if no workflow waiting |
Example
@workflow ... async def approver(ctx: WorkflowContext, request_id: str): ... decision = await review(ctx, request_id, activity_id="review:1") ... await send_to(ctx, instance_id=request_id, data={"approved": decision})
ChannelMessage¶
A message received from a channel.
Attributes:
| Name | Type | Description |
|---|---|---|
|
|
Unique message identifier |
|
|
Channel name this message was received on |
|
|
Message payload (dict or bytes) |
|
|
Optional metadata (source, timestamp, etc.) |
|
|
When the message was published |
Compensation¶
compensation¶
Compensation (Saga compensation) module for Edda framework.
This module provides compensation transaction support for implementing the Saga pattern with automatic rollback on failure.
CompensationAction
¶
Represents a compensation action that should be executed on rollback.
Compensation actions are stored in LIFO order (stack) and executed in reverse order when a workflow fails.
__init__(func, args, kwargs, name)
¶
Initialize a compensation action.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
|
The compensation function to execute |
required |
args
|
|
Positional arguments for the function |
required |
kwargs
|
|
Keyword arguments for the function |
required |
name
|
|
Human-readable name for this compensation |
required |
__repr__()
¶
String representation of the compensation action.
execute()
async
¶
Execute the compensation action.
Raises:
| Type | Description |
|---|---|
|
Any exception raised by the compensation function |
clear_compensations(ctx)
async
¶
Clear all registered compensation actions.
This is called when a workflow completes successfully and no longer needs the registered compensations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
|
Workflow context |
required |
compensation(func)
¶
Decorator to register a compensation function in the global registry.
This automatically registers the function when the module is imported, making it available for execution across all worker processes in a multi-process environment (e.g., tsuno, gunicorn).
Usage
@compensation ... async def cancel_reservation(ctx: WorkflowContext, reservation_id: str): ... await cancel_api(reservation_id)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
|
The compensation function to register |
required |
Returns:
| Type | Description |
|---|---|
|
The same function (unmodified) |
execute_compensations(ctx)
async
¶
Execute all registered compensation actions in LIFO order.
This is called automatically when a workflow fails and needs to rollback. Compensations are executed in reverse order (LIFO/stack semantics).
This function sets status to "compensating" before execution to enable crash recovery. The caller is responsible for setting the final status (failed/cancelled) after compensations complete.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
|
Workflow context |
required |
Raises:
| Type | Description |
|---|---|
|
If any compensation fails (logged but not propagated) |
on_failure(compensation_func)
¶
Decorator to automatically register a compensation function.
This decorator wraps an activity and automatically registers a compensation action when the activity completes successfully.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
compensation_func
|
|
The compensation function to register |
required |
Returns:
| Type | Description |
|---|---|
|
Decorator function |
Example
@activity ... @on_failure(release_inventory) ... async def reserve_inventory(ctx: WorkflowContext, order_id: str) -> dict: ... reservation_id = await make_reservation(order_id) ... return {"reservation_id": reservation_id} ... ... @activity ... async def release_inventory(ctx: WorkflowContext, reservation_id: str) -> None: ... await cancel_reservation(reservation_id)
register_compensation(ctx, compensation_func, *args, activity_id=None, **kwargs)
async
¶
Register a compensation action to be executed if the workflow fails.
Compensation actions are stored in LIFO order (like a stack) and will be executed in reverse order during rollback.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
|
Workflow context |
required |
compensation_func
|
|
The async function to call for compensation |
required |
*args
|
|
Positional arguments to pass to the compensation function |
()
|
activity_id
|
|
Activity ID to associate with this compensation (optional) |
None
|
**kwargs
|
|
Keyword arguments to pass to the compensation function |
{}
|
Example
@saga ... async def order_workflow(ctx: WorkflowContext, order_id: str) -> dict: ... # Execute activity ... result = await reserve_inventory(ctx, order_id, activity_id="reserve:1") ... ... # Register compensation AFTER activity execution ... await register_compensation( ... ctx, ... release_inventory, ... activity_id="reserve:1", ... reservation_id=result["reservation_id"] ... ) ... ... return result
on_failure¶
Decorator to automatically register a compensation function.
This decorator wraps an activity and automatically registers a compensation action when the activity completes successfully.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
compensation_func
|
|
The compensation function to register |
required |
Returns:
| Type | Description |
|---|---|
|
Decorator function |
Example
@activity ... @on_failure(release_inventory) ... async def reserve_inventory(ctx: WorkflowContext, order_id: str) -> dict: ... reservation_id = await make_reservation(order_id) ... return {"reservation_id": reservation_id} ... ... @activity ... async def release_inventory(ctx: WorkflowContext, reservation_id: str) -> None: ... await cancel_reservation(reservation_id)
register_compensation¶
Register a compensation action to be executed if the workflow fails.
Compensation actions are stored in LIFO order (like a stack) and will be executed in reverse order during rollback.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
|
Workflow context |
required |
compensation_func
|
|
The async function to call for compensation |
required |
*args
|
|
Positional arguments to pass to the compensation function |
()
|
activity_id
|
|
Activity ID to associate with this compensation (optional) |
None
|
**kwargs
|
|
Keyword arguments to pass to the compensation function |
{}
|
Example
@saga ... async def order_workflow(ctx: WorkflowContext, order_id: str) -> dict: ... # Execute activity ... result = await reserve_inventory(ctx, order_id, activity_id="reserve:1") ... ... # Register compensation AFTER activity execution ... await register_compensation( ... ctx, ... release_inventory, ... activity_id="reserve:1", ... reservation_id=result["reservation_id"] ... ) ... ... return result
Transactional Outbox¶
OutboxRelayer¶
Background relayer for publishing outbox events.
The relayer polls the database for pending events and publishes them to a Message Broker. It implements exponential backoff for retries and graceful shutdown.
Example
storage = SQLiteStorage("saga.db") relayer = OutboxRelayer( ... storage=storage, ... broker_url="http://broker-ingress.svc.cluster.local/default/default", ... poll_interval=1.0, ... max_retries=3 ... ) await relayer.start()
__init__(storage, broker_url, poll_interval=1.0, max_retries=3, batch_size=10, max_age_hours=None, wake_event=None)
¶
Initialize the Outbox Relayer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
storage
|
|
Storage backend for outbox events |
required |
broker_url
|
|
Message Broker URL for publishing events |
required |
poll_interval
|
|
Polling interval in seconds (default: 1.0) |
1.0
|
max_retries
|
|
Maximum retry attempts (default: 3) |
3
|
batch_size
|
|
Number of events to process per batch (default: 10) |
10
|
max_age_hours
|
|
Maximum event age in hours before expiration (default: None, disabled) Events older than this are marked as 'expired' and won't be retried. |
None
|
wake_event
|
|
Optional asyncio.Event to wake the relayer immediately when new events are added. Used with PostgreSQL LISTEN/NOTIFY integration. |
None
|
start()
async
¶
Start the background relayer task.
This creates an HTTP client and starts the polling loop in a background task.
stop()
async
¶
Stop the background relayer task gracefully.
This cancels the polling loop and closes the HTTP client.
send_event_transactional¶
Send an event using the transactional outbox pattern.
This function writes an event to the outbox table instead of sending it directly. The event will be asynchronously published by the Outbox Relayer.
This ensures that event publishing is atomic with the workflow execution: - If the workflow fails, the event is not published - If the workflow succeeds, the event is guaranteed to be published
Example
With dict¶
@activity ... async def reserve_inventory(ctx: WorkflowContext, order_id: str) -> dict: ... reservation_id = str(uuid.uuid4()) ... await send_event_transactional( ... ctx, ... event_type="inventory.reserved", ... event_source="order-service", ... event_data={ ... "order_id": order_id, ... "reservation_id": reservation_id, ... } ... ) ... return {"reservation_id": reservation_id}
With Pydantic model (automatically converted to JSON)¶
@activity ... async def reserve_inventory_typed(ctx: WorkflowContext, order_id: str) -> dict: ... reservation_id = str(uuid.uuid4()) ... event = InventoryReserved( ... order_id=order_id, ... reservation_id=reservation_id, ... ) ... await send_event_transactional( ... ctx, ... event_type="inventory.reserved", ... event_source="order-service", ... event_data=event, ... ) ... return {"reservation_id": reservation_id}
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ctx
|
|
Workflow context |
required |
event_type
|
|
CloudEvent type (e.g., "order.created") |
required |
event_source
|
|
CloudEvent source (e.g., "order-service") |
required |
event_data
|
|
Event payload (JSON dict or Pydantic model) |
required |
content_type
|
|
Content type (defaults to "application/json") |
'application/json'
|
Returns:
| Type | Description |
|---|---|
|
Event ID (UUID) |
Raises:
| Type | Description |
|---|---|
|
If writing to outbox fails |
Hooks¶
WorkflowHooks¶
Bases:
Protocol for workflow lifecycle hooks.
Users can implement this protocol to add custom observability, logging, or monitoring to their workflows. All methods are optional - implement only the ones you need.
The framework will check if a hook method exists before calling it, so partial implementations are fully supported.
on_activity_complete(instance_id, activity_id, activity_name, result, cache_hit)
async
¶
Called after an activity completes successfully.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instance_id
|
|
Unique workflow instance ID |
required |
activity_id
|
|
Activity ID (e.g., "reserve_inventory:1") |
required |
activity_name
|
|
Name of the activity function |
required |
result
|
|
Return value from the activity |
required |
cache_hit
|
|
True if result was retrieved from cache (replay) |
required |
on_activity_failed(instance_id, activity_id, activity_name, error)
async
¶
Called when an activity fails with an exception.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instance_id
|
|
Unique workflow instance ID |
required |
activity_id
|
|
Activity ID (e.g., "reserve_inventory:1") |
required |
activity_name
|
|
Name of the activity function |
required |
error
|
|
Exception that caused the failure |
required |
on_activity_retry(instance_id, activity_id, activity_name, error, attempt, delay)
async
¶
Called when an activity is about to be retried after a failure.
This hook is called BEFORE the retry delay (asyncio.sleep), allowing observability tools to track retry attempts in real-time.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instance_id
|
|
Unique workflow instance ID |
required |
activity_id
|
|
Activity ID (e.g., "my_activity:1") |
required |
activity_name
|
|
Name of the activity function |
required |
error
|
|
Exception that caused the failure |
required |
attempt
|
|
Current attempt number (1-indexed, before retry) |
required |
delay
|
|
Backoff delay in seconds before the next retry |
required |
on_activity_start(instance_id, activity_id, activity_name, is_replaying)
async
¶
Called before an activity executes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instance_id
|
|
Unique workflow instance ID |
required |
activity_id
|
|
Activity ID (e.g., "reserve_inventory:1") |
required |
activity_name
|
|
Name of the activity function |
required |
is_replaying
|
|
True if this is a replay (cached result) |
required |
on_event_received(instance_id, event_type, event_data)
async
¶
Called when a workflow receives an awaited event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instance_id
|
|
Unique workflow instance ID |
required |
event_type
|
|
CloudEvents type |
required |
event_data
|
|
Event payload |
required |
on_event_sent(event_type, event_source, event_data)
async
¶
Called when an event is sent (transactional outbox).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_type
|
|
CloudEvents type |
required |
event_source
|
|
CloudEvents source |
required |
event_data
|
|
Event payload |
required |
on_workflow_cancelled(instance_id, workflow_name)
async
¶
Called when a workflow is cancelled.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instance_id
|
|
Unique workflow instance ID |
required |
workflow_name
|
|
Name of the workflow function |
required |
on_workflow_complete(instance_id, workflow_name, result)
async
¶
Called when a workflow completes successfully.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instance_id
|
|
Unique workflow instance ID |
required |
workflow_name
|
|
Name of the workflow function |
required |
result
|
|
Return value from the workflow |
required |
on_workflow_failed(instance_id, workflow_name, error)
async
¶
Called when a workflow fails with an exception.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instance_id
|
|
Unique workflow instance ID |
required |
workflow_name
|
|
Name of the workflow function |
required |
error
|
|
Exception that caused the failure |
required |
on_workflow_start(instance_id, workflow_name, input_data)
async
¶
Called when a workflow starts execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instance_id
|
|
Unique workflow instance ID |
required |
workflow_name
|
|
Name of the workflow function |
required |
input_data
|
|
Input parameters passed to the workflow |
required |
HooksBase¶
Bases: ,
Abstract base class for WorkflowHooks implementations.
This can be used as a base class for partial implementations, so you don't have to implement all methods.
Example
class MyHooks(HooksBase): ... async def on_workflow_start(self, instance_id, workflow_name, input_data): ... print(f"Workflow started: {workflow_name}") ... # Other methods are no-ops (inherited from HooksBase)
Retry¶
RetryPolicy¶
Retry policy configuration for activities.
Inspired by Restate's retry mechanism with Edda-specific optimizations.
Attributes:
| Name | Type | Description |
|---|---|---|
|
|
First retry delay in seconds |
|
|
Exponential backoff multiplier |
|
|
Maximum retry delay in seconds (caps exponential growth) |
|
|
Maximum retry attempts (None = infinite, use with caution) |
|
|
Maximum total retry duration in seconds (None = infinite) |
|
|
Tuple of exception types to retry |
|
|
Tuple of exception types to never retry |
Example
Default policy (5 attempts, exponential backoff)¶
policy = RetryPolicy()
Custom policy¶
policy = RetryPolicy( initial_interval=0.5, backoff_coefficient=1.5, max_attempts=10, max_duration=120.0, )
Infinite retry (Restate-style, use with caution)¶
policy = RetryPolicy(max_attempts=None, max_duration=None)
calculate_delay(attempt)
¶
Calculate backoff delay for given attempt number.
Formula: delay = initial_interval * (backoff_coefficient ^ (attempt - 1)) Capped at max_interval to prevent excessive delays.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
attempt
|
|
Current attempt number (1-indexed) |
required |
Returns:
| Type | Description |
|---|---|
|
Delay in seconds (exponential backoff, capped at max_interval) |
is_retryable(error)
¶
Determine if an error is retryable.
Priority: 1. TerminalError -> always non-retryable 2. non_retryable_error_types -> non-retryable 3. retryable_error_types -> retryable 4. Default: non-retryable (safe default)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error
|
|
Exception to check |
required |
Returns:
| Type | Description |
|---|---|
|
True if error should be retried, False otherwise |
RetryExhaustedError¶
Bases:
Raised when an activity exhausts all retry attempts.
The original exception can be accessed via the cause attribute.
Attributes:
| Name | Type | Description |
|---|---|---|
|
The original exception (from the last retry attempt) |
Example
try: await my_activity(ctx) except RetryExhaustedError as e: # Error message: "Activity failed after 5 attempts: ..." print(f"Retries exhausted: {e}")
# Access original error via __cause__
original_error = e.__cause__
if isinstance(original_error, NetworkError):
logger.error("Network issue detected")
TerminalError¶
Bases:
Raised to indicate a non-retryable error.
Activities can raise this exception to immediately stop retry attempts. This is useful for errors that will never succeed (e.g., invalid input, authorization failure, resource not found).
The original exception is accessible via the cause attribute.
Example
@activity async def validate_user(ctx: WorkflowContext, user_id: str): user = await fetch_user(user_id) if not user: # Don't retry - user doesn't exist raise TerminalError(f"User {user_id} not found") return user
WSGI¶
create_wsgi_app¶
Create a WSGI-compatible application from an EddaApp instance.
This function wraps an EddaApp (ASGI) with a2wsgi's ASGIMiddleware, making it compatible with WSGI servers like gunicorn or uWSGI.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
edda_app
|
|
An initialized EddaApp instance |
required |
Returns:
| Type | Description |
|---|---|
|
A WSGI-compatible application callable |
Example
Basic usage with EddaApp::
from edda import EddaApp
from edda.wsgi import create_wsgi_app
from edda.storage.sqlalchemy_storage import SQLAlchemyStorage
# Create storage and EddaApp
storage = SQLAlchemyStorage("sqlite:///edda.db")
app = EddaApp(storage=storage)
# Create WSGI application
wsgi_app = create_wsgi_app(app)
Running with gunicorn::
# In your module (e.g., demo_app.py):
from edda import EddaApp
from edda.wsgi import create_wsgi_app
application = EddaApp(...) # ASGI
wsgi_application = create_wsgi_app(application) # WSGI
# Command line:
$ gunicorn demo_app:wsgi_application --workers 4
Running with uWSGI::
$ uwsgi --http :8000 --wsgi-file demo_app.py --callable wsgi_application
Background tasks (auto-resume, timer checks, etc.) will run in each worker process.
For production deployments, ASGI servers (uvicorn, hypercorn) are recommended for better performance with Edda's async architecture. WSGI support is provided for compatibility with existing infrastructure and for users who prefer synchronous programming with sync activities.
See Also
- :class:
edda.app.EddaApp: The main ASGI application class - :func:
edda.activity.activity: Decorator supporting sync activities