Skip to content

API Reference

Core

EddaApp

ASGI/WSGI compatible workflow application with distributed execution support.

This is the main entry point for the Edda framework. It handles: - CloudEvents HTTP endpoint - Event routing and workflow triggering - Distributed locking and coordination - Storage management

__call__(scope, receive, send) async

ASGI interface.

Parameters:

Name Type Description Default
scope dict[str, Any]

ASGI scope dictionary

required
receive Callable[[], Any]

Async function to receive messages

required
send Callable[[dict[str, Any]], Any]

Async function to send messages

required

__init__(service_name, db_url, outbox_enabled=False, broker_url=None, hooks=None, default_retry_policy=None, message_retention_days=7, pool_size=5, max_overflow=10, pool_timeout=30, pool_recycle=3600, pool_pre_ping=True, use_listen_notify=None, notify_fallback_interval=30, max_workflows_per_batch=10, leader_heartbeat_interval=15, leader_lease_duration=45)

Initialize Edda application.

Parameters:

Name Type Description Default
service_name str

Service name for distributed execution (e.g., "order-service")

required
db_url str

Database URL (e.g., "sqlite:///workflow.db")

required
outbox_enabled bool

Enable transactional outbox pattern

False
broker_url str | None

Broker URL for outbox publishing. Required if outbox_enabled=True.

None
hooks WorkflowHooks | None

Optional WorkflowHooks implementation for observability

None
default_retry_policy RetryPolicy | None

Default retry policy for all activities. If None, uses DEFAULT_RETRY_POLICY (5 attempts, exponential backoff). Can be overridden per-activity using @activity(retry_policy=...).

None
message_retention_days int

Number of days to retain channel messages before automatic cleanup. Defaults to 7 days. Messages older than this will be deleted by a background task running every hour.

7
pool_size int

Number of connections to keep open in the pool (default: 5). Ignored for SQLite. For production, consider 20+.

5
max_overflow int

Maximum number of connections to create above pool_size (default: 10). Ignored for SQLite. For production, consider 40+.

10
pool_timeout int

Seconds to wait for a connection from the pool (default: 30). Ignored for SQLite.

30
pool_recycle int

Seconds before a connection is recycled (default: 3600). Helps prevent stale connections. Ignored for SQLite.

3600
pool_pre_ping bool

If True, test connections before use (default: True). Helps detect disconnected connections. Ignored for SQLite.

True
use_listen_notify bool | None

Enable PostgreSQL LISTEN/NOTIFY for instant notifications. None (default) = auto-detect (enabled for PostgreSQL, disabled for others). True = force enable (raises error if not PostgreSQL). False = force disable (use polling only).

None
notify_fallback_interval int

Polling interval in seconds when NOTIFY is enabled. Used as backup for missed notifications. Default: 30 seconds. SQLite/MySQL always use their default polling intervals.

30
max_workflows_per_batch int | Literal['auto', 'auto:cpu']

Maximum workflows to process per resume cycle. - int: Fixed batch size (default: 10) - "auto": Scale 10-100 based on queue depth - "auto:cpu": Scale 10-100 based on CPU utilization (requires psutil)

10
leader_heartbeat_interval int

Interval in seconds for leader heartbeat (default: 15). Controls how often workers attempt to become/maintain leadership.

15
leader_lease_duration int

Duration in seconds for leader lease (default: 45). If leader fails to heartbeat within this time, another worker takes over.

45

find_instances(*, input_filters=None, status=None, workflow_name=None, instance_id=None, started_after=None, started_before=None, limit=50, page_token=None) async

Find workflow instances with filtering support.

This is a high-level API for querying workflow instances by various criteria, including input parameter values.

Parameters:

Name Type Description Default
input_filters dict[str, Any] | None

Filter by input data values. Keys are JSON paths, values are expected values (exact match). Example: {"order_id": "ORD-123"}

None
status str | None

Filter by workflow status (e.g., "running", "completed")

None
workflow_name str | None

Filter by workflow name (partial match, case-insensitive)

None
instance_id str | None

Filter by instance ID (partial match, case-insensitive)

None
started_after datetime | None

Filter instances started after this datetime (inclusive)

None
started_before datetime | None

Filter instances started before this datetime (inclusive)

None
limit int

Maximum number of instances to return per page (default: 50)

50
page_token str | None

Cursor for pagination (from previous response)

None

Returns:

Type Description
dict[str, Any]

Dictionary containing:

dict[str, Any]
  • instances: List of matching workflow instances
dict[str, Any]
  • next_page_token: Cursor for the next page, or None if no more pages
dict[str, Any]
  • has_more: Boolean indicating if there are more pages
Example

Find all instances with order_id = "ORD-123"

result = await app.find_instances(input_filters={"order_id": "ORD-123"}) for instance in result["instances"]: ... print(f"{instance['instance_id']}: {instance['status']}")

Find running instances with specific customer

result = await app.find_instances( ... input_filters={"customer_id": "CUST-456"}, ... status="running" ... )

handle_cloudevent(event, wait=False) async

Handle incoming CloudEvent.

This will route the event to registered handlers and deliver events to waiting workflows.

By default, handlers are executed as background tasks to avoid blocking the HTTP response. Set wait=True for synchronous execution (useful for testing).

Parameters:

Name Type Description Default
event Any

CloudEvent instance

required
wait bool

If True, wait for handlers to complete before returning. If False (default), execute handlers as background tasks.

False

initialize() async

Initialize the application.

This should be called before the app starts receiving requests.

on_event(event_type, proto_type=None)

Decorator to register an event handler.

Example

@app.on_event("order.created") ... async def handle_order_created(event): ... await order_workflow.start(...)

Parameters:

Name Type Description Default
event_type str

CloudEvent type to handle

required
proto_type type[Any] | None

Optional protobuf message type

None

Returns:

Type Description
Callable[[Callable[..., Any]], Callable[..., Any]]

Decorator function

shutdown() async

Shutdown the application and cleanup resources.

This should be called when the app is shutting down.

workflow

Workflow module for Edda framework.

This module provides the @workflow decorator for defining workflow functions and managing workflow instances.

RecurException

Bases: Exception

Exception raised to signal that a workflow should recur (restart with fresh history).

This is similar to Erlang's tail recursion pattern - it prevents unbounded history growth in long-running loops by completing the current workflow instance and starting a new one with the provided arguments.

The workflow's history is archived (not deleted) and a new instance is created with a reference to the previous instance (continued_from).

Note

This exception should not be caught by user code. It is handled internally by the ReplayEngine.

Example

@workflow ... async def notification_service(ctx: WorkflowContext, processed_count: int = 0): ... await join_group(ctx, group="order_watchers") ... ... count = 0 ... while True: ... msg = await wait_message(ctx, channel="order.completed") ... await send_notification(ctx, msg.data, activity_id=f"notify:{msg.id}") ... ... count += 1 ... if count >= 1000: ... # Reset history by recurring with new state ... await ctx.recur(processed_count=processed_count + count)

__init__(kwargs)

Initialize RecurException.

Parameters:

Name Type Description Default
kwargs dict[str, Any]

Keyword arguments to pass to the new workflow instance

required

Workflow

Wrapper class for workflow functions.

Provides methods for starting and managing workflow instances.

__call__(*args, **kwargs) async

Direct call to the workflow function.

This is typically used during replay by the replay engine.

Parameters:

Name Type Description Default
*args Any

Positional arguments

()
**kwargs Any

Keyword arguments

{}

Returns:

Type Description
Any

Workflow result

__init__(func, event_handler=False, lock_timeout_seconds=None)

Initialize workflow wrapper.

Parameters:

Name Type Description Default
func Callable[..., Any]

The async function to wrap as a workflow

required
event_handler bool

Whether to auto-register as CloudEvent handler

False
lock_timeout_seconds int | None

Default lock timeout for this workflow (None = global default 300s)

None

resume(instance_id, event=None) async

Resume an existing workflow instance.

Parameters:

Name Type Description Default
instance_id str

Workflow instance ID

required
event Any

Optional event that triggered the resume

None

Raises:

Type Description
RuntimeError

If replay engine not initialized

start(lock_timeout_seconds=None, **kwargs) async

Start a new workflow instance.

Pydantic models in kwargs are automatically converted to JSON-compatible dicts for storage. During execution, they will be restored back to Pydantic models based on the workflow function's type hints.

Parameters:

Name Type Description Default
lock_timeout_seconds int | None

Override lock timeout for this specific execution (None = use decorator default or global default 300s)

None
**kwargs Any

Input parameters for the workflow (can include Pydantic models)

{}

Returns:

Type Description
str

Instance ID of the started workflow

Raises:

Type Description
RuntimeError

If replay engine not initialized

get_all_workflows()

Get all registered workflow definitions.

Returns:

Type Description
dict[str, Workflow]

Dictionary mapping workflow names to Workflow instances

set_replay_engine(engine)

Set the global replay engine.

This is called by EddaApp during initialization.

Parameters:

Name Type Description Default
engine Any

ReplayEngine instance

required

workflow(func=None, *, event_handler=False, lock_timeout_seconds=None)

Decorator for defining workflows.

Workflows are the top-level orchestration functions that coordinate multiple activities. They support deterministic replay and can wait for external events.

By default, workflows are NOT automatically registered as CloudEvent handlers. Set event_handler=True to enable automatic CloudEvent handling.

Example

Basic workflow (manual event handling)

@workflow ... async def order_workflow(ctx: WorkflowContext, order_id: str, amount: int): ... inventory = await reserve_inventory(ctx, order_id) ... payment = await process_payment(ctx, order_id, amount) ... return {"status": "completed"} ... ... # Start the workflow manually ... instance_id = await order_workflow.start(order_id="123", amount=100) ...

Workflow with automatic CloudEvent handling

@workflow(event_handler=True) ... async def auto_workflow(ctx: WorkflowContext, **kwargs): ... # This will automatically handle CloudEvents with type="auto_workflow" ... pass ...

Workflow with custom lock timeout

@workflow(lock_timeout_seconds=600) ... async def long_running_workflow(ctx: WorkflowContext, **kwargs): ... # This workflow will use a 10-minute lock timeout instead of the default 5 minutes ... pass

Parameters:

Name Type Description Default
func F | None

Async function to wrap as a workflow

None
event_handler bool

If True, automatically register as CloudEvent handler

False
lock_timeout_seconds int | None

Default lock timeout for this workflow (None = global default 300s)

None

Returns:

Type Description
F | Callable[[F], F]

Decorated Workflow instance

activity

Activity module for Edda framework.

This module provides the @activity decorator for defining atomic units of work within workflows. Activities are the building blocks of Sagas and support deterministic replay through result caching.

Activity

Wrapper class for activity functions.

Handles execution, result caching during replay, and history recording. Supports automatic retry with exponential backoff.

__call__(ctx, *args, **kwargs) async

Execute the activity with automatic retry.

During replay, returns cached result. During normal execution, executes the function with retry logic and records the result.

Parameters:

Name Type Description Default
ctx WorkflowContext

Workflow context

required
*args Any

Positional arguments for the activity

()
**kwargs Any

Keyword arguments for the activity Optional: activity_id (str) - Explicit activity ID - Auto-generated by default (format: "{function_name}:{counter}") - Manual specification required ONLY for concurrent execution (asyncio.gather, async for, etc.) - For sequential execution, rely on auto-generation

{}

Returns:

Type Description
Any

Activity result

Raises:

Type Description
RetryExhaustedError

When all retry attempts are exhausted

TerminalError

For non-retryable errors

WorkflowCancelledException

When workflow is cancelled

Example

Sequential execution (auto-generated IDs - recommended)::

result1 = await my_activity(ctx, arg1)  # Auto: "my_activity:1"
result2 = await my_activity(ctx, arg2)  # Auto: "my_activity:2"

Concurrent execution (manual IDs - required)::

results = await asyncio.gather(
    my_activity(ctx, arg1, activity_id="my_activity:1"),
    my_activity(ctx, arg2, activity_id="my_activity:2"),
)

__init__(func, retry_policy=None)

Initialize activity wrapper.

Parameters:

Name Type Description Default
func Callable[..., Any]

The async or sync function to wrap

required
retry_policy RetryPolicy | None

Optional retry policy for this activity. If None, uses the default policy from EddaApp.

None

activity(func=None, *, retry_policy=None)

Decorator for defining activities (atomic units of work) with automatic retry.

Activities can be async or sync functions that take a WorkflowContext as the first parameter, followed by any other parameters. Sync functions are executed in a thread pool to avoid blocking the event loop.

Activities are automatically wrapped in a transaction, ensuring that activity execution, history recording, and event sending are atomic. Each retry attempt is executed in an independent transaction.

When using ctx.session to access the Edda-managed session, all operations (activity execution, history recording, event sending) use that shared session, ensuring atomicity within a single transaction.

For non-idempotent operations (e.g., external API calls), place them in Activities to leverage result caching during replay. For operations that can be safely re-executed during replay, place them directly in the Workflow function.

Example

@activity # Sync activity (no async/await) ... def reserve_inventory(ctx: WorkflowContext, order_id: str) -> dict: ... # Your business logic here (executed in thread pool) ... return {"reservation_id": "123"}

@activity # Async activity (recommended for I/O-bound operations) ... async def reserve_inventory_async(ctx: WorkflowContext, order_id: str) -> dict: ... # Async I/O operations ... return {"reservation_id": "123"}

from edda.retry import RetryPolicy, AGGRESSIVE_RETRY @activity(retry_policy=AGGRESSIVE_RETRY) # Custom retry policy ... def process_payment(ctx: WorkflowContext, amount: float) -> dict: ... # Fast retries for low-latency services ... return {"status": "completed"}

@activity # Non-idempotent operations cached during replay ... def charge_credit_card(ctx: WorkflowContext, amount: float) -> dict: ... # External API call - result is cached, won't be called again on replay ... # If this fails, automatic retry with exponential backoff ... return {"transaction_id": "txn_123"}

from edda.exceptions import TerminalError @activity ... def validate_user(ctx: WorkflowContext, user_id: str) -> dict: ... user = fetch_user(user_id) # No await needed for sync ... if not user: ... # Don't retry - user doesn't exist ... raise TerminalError(f"User {user_id} not found") ... return {"user_id": user_id, "name": user.name}

Parameters:

Name Type Description Default
func F | None

Async or sync function to wrap as an activity

None
retry_policy RetryPolicy | None

Optional retry policy for this activity. If None, uses the default policy from EddaApp.

None

Returns:

Type Description
F | Callable[[F], F]

Decorated function that can be called within a workflow

Raises:

Type Description
RetryExhaustedError

When all retry attempts are exhausted

TerminalError

For non-retryable errors (no retry attempted)

Sync activities are executed in a thread pool. For I/O-bound operations (database queries, HTTP requests, etc.), async activities are recommended for better performance.

WorkflowContext

Context for workflow execution.

Provides access to workflow instance metadata, storage, history management, and utilities for deterministic replay.

This context is passed to activities and contains all the information needed for execution and replay.

session property

Get Edda-managed database session for custom database operations.

This property provides access to the current transaction's SQLAlchemy session, allowing you to execute custom database operations (ORM queries, raw SQL, etc.) within the same transaction as Edda's workflow operations.

The session is automatically managed by Edda: - Commit/rollback happens automatically at the end of @activity - All operations are atomic (workflow history + your DB operations) - Transaction safety is guaranteed

Returns:

Type Description
AsyncSession

AsyncSession managed by Edda's transaction context

Raises:

Type Description
RuntimeError

If not inside a transaction (must use @activity or ctx.transaction())

Example

@activity async def create_order(ctx: WorkflowContext, order_id: str, amount: float): # Get Edda-managed session session = ctx.session

# Your business logic (same DB as Edda)
order = Order(order_id=order_id, amount=amount)
session.add(order)

# Event publishing (same transaction)
await send_event_transactional(
    ctx, "order.created", "order-service",
    {"order_id": order_id, "amount": amount}
)

# Edda commits automatically (or rolls back on error)
return {"order_id": order_id, "status": "created"}
Note
  • Requires @activity (default) or async with ctx.transaction()
  • All operations commit/rollback together atomically
  • Your tables must be in the same database as Edda
  • Do NOT call session.commit() or session.rollback() manually

storage property

Get storage backend (internal use only).

Warning

This property is for framework internal use only. Direct storage access may break deterministic replay guarantees. Use WorkflowContext methods instead (transaction(), in_transaction()).

__init__(instance_id, workflow_name, storage, worker_id, is_replaying=False, hooks=None)

Initialize workflow context.

Parameters:

Name Type Description Default
instance_id str

Workflow instance ID

required
workflow_name str

Name of the workflow

required
storage StorageProtocol

Storage backend

required
worker_id str

Worker ID holding the lock

required
is_replaying bool

Whether this is a replay execution

False
hooks Any

Optional WorkflowHooks implementation for observability

None

__repr__()

String representation of the context.

in_transaction()

Check if currently in a transaction.

This method is useful for ensuring that transactional operations (like send_event_transactional) are called within a transaction context.

Returns:

Type Description
bool

True if inside a transaction context, False otherwise

Example

if ctx.in_transaction(): await send_event_transactional(ctx, "order.created", ...) else: logger.warning("Not in transaction, using outbox pattern") await send_event_transactional(ctx, "order.created", ...)

recur(**kwargs) async

Restart the workflow with fresh history (Erlang-style tail recursion).

This method prevents unbounded history growth in long-running loops by: 1. Completing the current workflow instance (marking as "recurred") 2. Archiving the current history (not deleted) 3. Starting a new workflow instance with the provided arguments 4. Linking the new instance to the old one via continued_from

This is similar to Erlang's tail recursion pattern where calling the same function at the end of a loop prevents stack growth. In Edda, recur() prevents history growth.

Parameters:

Name Type Description Default
**kwargs Any

Arguments to pass to the new workflow instance. These become the input parameters for the next iteration.

{}

Raises:

Type Description
RecurException

Always raised to signal the ReplayEngine to handle the recur operation. This exception should not be caught.

Example

@workflow ... async def notification_service(ctx: WorkflowContext, processed_count: int = 0): ... await join_group(ctx, group="order_watchers") ... ... count = 0 ... while True: ... msg = await wait_message(ctx, channel="order.completed") ... await send_notification(ctx, msg.data, activity_id=f"notify:{msg.id}") ... ... count += 1 ... if count >= 1000: ... # Reset history every 1000 iterations ... await ctx.recur(processed_count=processed_count + count) ... # Code after recur() is never executed

Note
  • Group memberships are NOT automatically transferred. You must re-join groups in the new iteration if needed.
  • The old workflow's history is archived, not deleted.
  • The new instance has a continued_from field pointing to the old instance.
  • During replay, if recur() was already called, this raises immediately without re-executing previous activities.

register_post_commit(callback)

Register a callback to be executed after the current transaction commits.

The callback will be executed after the top-level transaction commits successfully. If the transaction is rolled back, the callback will NOT be executed. This is useful for deferring side effects (like message delivery) until after the transaction has been committed.

Parameters:

Name Type Description Default
callback Callable[[], Awaitable[None]]

An async function to call after commit.

required

Raises:

Type Description
RuntimeError

If not in a transaction.

Example

async with ctx.transaction(): # Save order to database await ctx.storage.append_history(...)

# Defer message delivery until after commit
async def deliver_notifications():
    await notify_subscribers(order_id)
ctx.register_post_commit(deliver_notifications)

transaction() async

Create a transactional context for atomic operations.

This context manager allows you to execute multiple storage operations within a single database transaction. All operations will be committed together, or rolled back together if an exception occurs.

Example

async with ctx.transaction(): # All operations here are in the same transaction await ctx.storage.append_history(...) await send_event_transactional(ctx, ...) # If any operation fails, all changes are rolled back

Yields:

Type Description
AsyncIterator[None]

None

Raises:

Type Description
Exception

If any operation within the transaction fails, the transaction is rolled back and the exception is re-raised


find_instances

Find workflow instances with filtering support.

This is a high-level API for querying workflow instances by various criteria, including input parameter values.

Parameters:

Name Type Description Default
input_filters dict[str, Any] | None

Filter by input data values. Keys are JSON paths, values are expected values (exact match). Example: {"order_id": "ORD-123"}

None
status str | None

Filter by workflow status (e.g., "running", "completed")

None
workflow_name str | None

Filter by workflow name (partial match, case-insensitive)

None
instance_id str | None

Filter by instance ID (partial match, case-insensitive)

None
started_after datetime | None

Filter instances started after this datetime (inclusive)

None
started_before datetime | None

Filter instances started before this datetime (inclusive)

None
limit int

Maximum number of instances to return per page (default: 50)

50
page_token str | None

Cursor for pagination (from previous response)

None

Returns:

Type Description
dict[str, Any]

Dictionary containing:

dict[str, Any]
  • instances: List of matching workflow instances
dict[str, Any]
  • next_page_token: Cursor for the next page, or None if no more pages
dict[str, Any]
  • has_more: Boolean indicating if there are more pages
Example

Find all instances with order_id = "ORD-123"

result = await app.find_instances(input_filters={"order_id": "ORD-123"}) for instance in result["instances"]: ... print(f"{instance['instance_id']}: {instance['status']}")

Find running instances with specific customer

result = await app.find_instances( ... input_filters={"customer_id": "CUST-456"}, ... status="running" ... )


Timer Functions

sleep

Pause workflow execution for a specified duration.

This is a durable sleep - the workflow will be resumed after the specified time even if the worker restarts.

Parameters:

Name Type Description Default
ctx WorkflowContext

Workflow context

required
seconds int

Duration to sleep in seconds

required
timer_id str | None

Optional unique ID for this timer (auto-generated if not provided)

None
Example

@workflow ... async def order_workflow(ctx: WorkflowContext, order_id: str): ... await create_order(ctx, order_id, activity_id="create:1") ... await sleep(ctx, 60) # Wait 60 seconds for payment ... await check_payment(ctx, order_id, activity_id="check:1")

sleep_until

Pause workflow execution until a specific time.

This is a durable sleep - the workflow will be resumed at the specified time even if the worker restarts.

Parameters:

Name Type Description Default
ctx WorkflowContext

Workflow context

required
target_time datetime

Absolute time to wake up (must be timezone-aware)

required
timer_id str | None

Optional unique ID for this timer (auto-generated if not provided)

None
Example

from datetime import datetime, timedelta, UTC

@workflow ... async def scheduled_report(ctx: WorkflowContext, report_id: str): ... # Schedule for tomorrow at 9 AM ... tomorrow_9am = datetime.now(UTC).replace(hour=9, minute=0, second=0) ... tomorrow_9am += timedelta(days=1) ... await sleep_until(ctx, tomorrow_9am) ... await generate_report(ctx, report_id, activity_id="generate:1")


Events

wait_event

Wait for a CloudEvent to arrive.

This function pauses the workflow execution until a matching CloudEvent is received. During replay, it returns the cached event data and metadata.

Internally, this uses the Channel-based Message Queue with event_type as the channel name. CloudEvents metadata is preserved in the message metadata.

Parameters:

Name Type Description Default
ctx WorkflowContext

Workflow context

required
event_type str

CloudEvent type to wait for (e.g., "payment.completed")

required
timeout_seconds int | None

Optional timeout in seconds

None
model type[Any] | None

Optional Pydantic model class to convert event data to

None
event_id str | None

Optional event identifier (auto-generated if not provided)

None

Returns:

Type Description
ReceivedEvent

ReceivedEvent object containing event data and CloudEvents metadata.

ReceivedEvent

If model is provided, ReceivedEvent.data will be a Pydantic model instance.

Note

Events are delivered to workflows that are subscribed to the event_type channel. Use subscribe(ctx, event_type) before calling wait_event() or let it auto-subscribe.

Raises:

Type Description
WaitForChannelMessageException

During normal execution to pause the workflow

EventTimeoutError

If timeout is reached

Example

Without Pydantic (dict access)

@workflow ... async def order_workflow(ctx: WorkflowContext, order_id: str): ... await subscribe(ctx, "payment.completed", mode="broadcast") ... payment_event = await wait_event(ctx, "payment.completed") ... amount = payment_event.data["amount"] ... order_id = payment_event.data["order_id"] ...

With Pydantic (type-safe access)

@workflow ... async def order_workflow_typed(ctx: WorkflowContext, order_id: str): ... await subscribe(ctx, "payment.completed", mode="broadcast") ... payment_event = await wait_event( ... ctx, ... event_type="payment.completed", ... model=PaymentCompleted ... ) ... # Type-safe access with IDE completion ... amount = payment_event.data.amount

send_event

Send a CloudEvent to Knative Broker.

Parameters:

Name Type Description Default
event_type str

CloudEvent type (e.g., "order.created")

required
source str

CloudEvent source (e.g., "order-service")

required
data dict[str, Any] | Any

Event payload (JSON dict or Pydantic model)

required
broker_url str

Knative Broker URL

'http://broker-ingress.knative-eventing.svc.cluster.local'
datacontenttype str | None

Content type (defaults to "application/json")

None

Raises:

Type Description
HTTPError

If the HTTP request fails

Example

With dict

await send_event("order.created", "order-service", {"order_id": "123"})

With Pydantic model (automatically converted to JSON)

order = OrderCreated(order_id="123", amount=99.99) await send_event("order.created", "order-service", order)

ReceivedEvent

Represents a CloudEvent received by a workflow.

This class provides structured access to both the event payload (data) and CloudEvents metadata (type, source, id, time, etc.).

Attributes:

Name Type Description
data dict[str, Any] | Any

The event payload (JSON dict or Pydantic model)

type str

CloudEvent type (e.g., "payment.completed")

source str

CloudEvent source (e.g., "payment-service")

id str

Unique event identifier

time str | None

Event timestamp (ISO 8601 format)

datacontenttype str | None

Content type of the data (typically "application/json")

subject str | None

Subject of the event (optional CloudEvents extension)

extensions dict[str, Any]

Additional CloudEvents extension attributes

Example

Without Pydantic model

event = await wait_event(ctx, "payment.completed") amount = event.data["amount"] order_id = event.data["order_id"]

With Pydantic model (type-safe)

event = await wait_event(ctx, "payment.completed", model=PaymentCompleted) amount = event.data.amount # Type-safe access order_id = event.data.order_id # IDE completion

Access CloudEvents metadata

event_source = event.source event_time = event.time event_id = event.id

EventTimeoutError

Bases: Exception

Exception raised when wait_event() times out.

This exception is raised when an event does not arrive within the specified timeout period. The workflow can catch this exception to handle timeout scenarios gracefully.

Example

try: event = await wait_event(ctx, "payment.completed", timeout_seconds=60) except EventTimeoutError: # Handle timeout - maybe send reminder or cancel order await send_notification("Payment timeout")


Channel-based Messaging

subscribe

Subscribe to a channel for receiving messages.

Parameters:

Name Type Description Default
ctx WorkflowContext

Workflow context

required
channel str

Channel name to subscribe to

required
mode str

Subscription mode: - "broadcast": All subscribers receive all messages (fan-out pattern) - "competing": Each message goes to only one subscriber (work queue pattern) - "direct": Receive messages sent via send_to() to this instance

'broadcast'

Raises:

Type Description
ChannelModeConflictError

If the channel is already configured with a different mode

ValueError

If mode is not 'broadcast', 'competing', or 'direct'

The "direct" mode is syntactic sugar that subscribes to "channel:instance_id" internally, allowing simpler code when receiving direct messages:

# Instead of this:
direct_channel = f"notifications:{ctx.instance_id}"
await subscribe(ctx, direct_channel, mode="broadcast")
msg = await receive(ctx, direct_channel)

# You can write:
await subscribe(ctx, "notifications", mode="direct")
msg = await receive(ctx, "notifications")
Example

@workflow ... async def event_handler(ctx: WorkflowContext, id: str): ... # Subscribe to order events (all handlers receive all events) ... await subscribe(ctx, "order.events", mode="broadcast") ... ... while True: ... event = await receive(ctx, "order.events") ... await handle_event(ctx, event.data, activity_id=f"handle:{event.id}") ... await ctx.recur()

@workflow ... async def job_worker(ctx: WorkflowContext, worker_id: str): ... # Subscribe to job queue (each job processed by one worker) ... await subscribe(ctx, "jobs", mode="competing") ... ... while True: ... job = await receive(ctx, "jobs") ... await execute_job(ctx, job.data, activity_id=f"job:{job.id}") ... await ctx.recur()

@workflow ... async def direct_receiver(ctx: WorkflowContext, id: str): ... # Subscribe to receive direct messages via send_to() ... await subscribe(ctx, "notifications", mode="direct") ... ... msg = await receive(ctx, "notifications") ... print(f"Received: {msg.data}")

unsubscribe

Unsubscribe from a channel.

Note: Workflows are automatically unsubscribed from all channels when they complete, fail, or are cancelled. Explicit unsubscribe is usually not necessary.

For channels subscribed with mode="direct", use the original channel name (not the transformed "channel:instance_id" form).

Parameters:

Name Type Description Default
ctx WorkflowContext

Workflow context

required
channel str

Channel name to unsubscribe from

required

receive

Receive a message from a channel.

This function blocks (pauses the workflow) until a message is available on the channel. Messages are queued persistently, so messages published before this function is called will still be received.

Parameters:

Name Type Description Default
ctx WorkflowContext

Workflow context

required
channel str

Channel name to receive from

required
timeout_seconds int | None

Optional timeout in seconds

None
message_id str | None

Optional ID for concurrent waiting (auto-generated if not provided)

None

Returns:

Type Description
ChannelMessage

ChannelMessage object containing data and metadata

Raises:

Type Description
WaitForChannelMessageException

Raised to pause workflow (caught by ReplayEngine)

TimeoutError

If timeout expires before message arrives

Example

@workflow ... async def consumer(ctx: WorkflowContext, id: str): ... await subscribe(ctx, "tasks", mode="competing") ... ... while True: ... msg = await receive(ctx, "tasks") ... await process(ctx, msg.data, activity_id=f"process:{msg.id}") ... await ctx.recur()

publish

Publish a message to a channel.

Can be called from within a workflow (with WorkflowContext) or from external code (with StorageProtocol directly).

Parameters:

Name Type Description Default
ctx_or_storage WorkflowContext | StorageProtocol

Workflow context or storage backend

required
channel str

Channel name to publish to

required
data dict[str, Any] | bytes

Message payload (dict or bytes)

required
metadata dict[str, Any] | None

Optional metadata

None
target_instance_id str | None

If provided, only deliver to this specific instance (Point-to-Point delivery). If None, deliver to all waiting subscribers (Pub/Sub delivery).

None
worker_id str | None

Optional worker ID for Lock-First pattern (required for CloudEvents HTTP handler)

None

Returns:

Type Description
str

Message ID of the published message

Example

From within a workflow

@workflow ... async def order_processor(ctx: WorkflowContext, order_id: str): ... result = await process_order(ctx, order_id, activity_id="process:1") ... await publish(ctx, "order.completed", {"order_id": order_id}) ... return result

From external code (e.g., HTTP handler)

async def api_handler(request): ... message_id = await publish(app.storage, "jobs", {"task": "process"}) ... return {"message_id": message_id}

Point-to-Point delivery (CloudEvents with eddainstanceid)

await publish( ... storage, "payment.completed", {"amount": 100}, ... target_instance_id="order-123", worker_id="worker-1" ... )

send_to

Send a message directly to a specific workflow instance.

This is useful for workflow-to-workflow communication where the target instance ID is known.

Parameters:

Name Type Description Default
ctx WorkflowContext

Workflow context (source workflow)

required
instance_id str

Target workflow instance ID

required
channel str

Channel name (defaults to "direct" for direct messages)

'__direct__'
data dict[str, Any] | bytes

Message payload

required
metadata dict[str, Any] | None

Optional metadata

None

Returns:

Type Description
bool

True if delivered, False if no workflow waiting

Example

@workflow ... async def approver(ctx: WorkflowContext, request_id: str): ... decision = await review(ctx, request_id, activity_id="review:1") ... await send_to(ctx, instance_id=request_id, data={"approved": decision})

ChannelMessage

A message received from a channel.

Attributes:

Name Type Description
id str

Unique message identifier

channel str

Channel name this message was received on

data dict[str, Any] | bytes

Message payload (dict or bytes)

metadata dict[str, Any]

Optional metadata (source, timestamp, etc.)

published_at datetime

When the message was published


Compensation

compensation

Compensation (Saga compensation) module for Edda framework.

This module provides compensation transaction support for implementing the Saga pattern with automatic rollback on failure.

CompensationAction

Represents a compensation action that should be executed on rollback.

Compensation actions are stored in LIFO order (stack) and executed in reverse order when a workflow fails.

__init__(func, args, kwargs, name)

Initialize a compensation action.

Parameters:

Name Type Description Default
func Callable[..., Any]

The compensation function to execute

required
args tuple[Any, ...]

Positional arguments for the function

required
kwargs dict[str, Any]

Keyword arguments for the function

required
name str

Human-readable name for this compensation

required

__repr__()

String representation of the compensation action.

execute() async

Execute the compensation action.

Raises:

Type Description
Exception

Any exception raised by the compensation function

clear_compensations(ctx) async

Clear all registered compensation actions.

This is called when a workflow completes successfully and no longer needs the registered compensations.

Parameters:

Name Type Description Default
ctx WorkflowContext

Workflow context

required

compensation(func)

Decorator to register a compensation function in the global registry.

This automatically registers the function when the module is imported, making it available for execution across all worker processes in a multi-process environment (e.g., tsuno, gunicorn).

Usage

@compensation ... async def cancel_reservation(ctx: WorkflowContext, reservation_id: str): ... await cancel_api(reservation_id)

Parameters:

Name Type Description Default
func F

The compensation function to register

required

Returns:

Type Description
F

The same function (unmodified)

execute_compensations(ctx) async

Execute all registered compensation actions in LIFO order.

This is called automatically when a workflow fails and needs to rollback. Compensations are executed in reverse order (LIFO/stack semantics).

This function sets status to "compensating" before execution to enable crash recovery. The caller is responsible for setting the final status (failed/cancelled) after compensations complete.

Parameters:

Name Type Description Default
ctx WorkflowContext

Workflow context

required

Raises:

Type Description
Exception

If any compensation fails (logged but not propagated)

on_failure(compensation_func)

Decorator to automatically register a compensation function.

This decorator wraps an activity and automatically registers a compensation action when the activity completes successfully.

Parameters:

Name Type Description Default
compensation_func Callable[..., Any]

The compensation function to register

required

Returns:

Type Description
Callable[[F], F]

Decorator function

Example

@activity ... @on_failure(release_inventory) ... async def reserve_inventory(ctx: WorkflowContext, order_id: str) -> dict: ... reservation_id = await make_reservation(order_id) ... return {"reservation_id": reservation_id} ... ... @activity ... async def release_inventory(ctx: WorkflowContext, reservation_id: str) -> None: ... await cancel_reservation(reservation_id)

register_compensation(ctx, compensation_func, *args, activity_id=None, **kwargs) async

Register a compensation action to be executed if the workflow fails.

Compensation actions are stored in LIFO order (like a stack) and will be executed in reverse order during rollback.

Parameters:

Name Type Description Default
ctx WorkflowContext

Workflow context

required
compensation_func Callable[..., Any]

The async function to call for compensation

required
*args Any

Positional arguments to pass to the compensation function

()
activity_id str | None

Activity ID to associate with this compensation (optional)

None
**kwargs Any

Keyword arguments to pass to the compensation function

{}
Example

@saga ... async def order_workflow(ctx: WorkflowContext, order_id: str) -> dict: ... # Execute activity ... result = await reserve_inventory(ctx, order_id, activity_id="reserve:1") ... ... # Register compensation AFTER activity execution ... await register_compensation( ... ctx, ... release_inventory, ... activity_id="reserve:1", ... reservation_id=result["reservation_id"] ... ) ... ... return result

on_failure

Decorator to automatically register a compensation function.

This decorator wraps an activity and automatically registers a compensation action when the activity completes successfully.

Parameters:

Name Type Description Default
compensation_func Callable[..., Any]

The compensation function to register

required

Returns:

Type Description
Callable[[F], F]

Decorator function

Example

@activity ... @on_failure(release_inventory) ... async def reserve_inventory(ctx: WorkflowContext, order_id: str) -> dict: ... reservation_id = await make_reservation(order_id) ... return {"reservation_id": reservation_id} ... ... @activity ... async def release_inventory(ctx: WorkflowContext, reservation_id: str) -> None: ... await cancel_reservation(reservation_id)

register_compensation

Register a compensation action to be executed if the workflow fails.

Compensation actions are stored in LIFO order (like a stack) and will be executed in reverse order during rollback.

Parameters:

Name Type Description Default
ctx WorkflowContext

Workflow context

required
compensation_func Callable[..., Any]

The async function to call for compensation

required
*args Any

Positional arguments to pass to the compensation function

()
activity_id str | None

Activity ID to associate with this compensation (optional)

None
**kwargs Any

Keyword arguments to pass to the compensation function

{}
Example

@saga ... async def order_workflow(ctx: WorkflowContext, order_id: str) -> dict: ... # Execute activity ... result = await reserve_inventory(ctx, order_id, activity_id="reserve:1") ... ... # Register compensation AFTER activity execution ... await register_compensation( ... ctx, ... release_inventory, ... activity_id="reserve:1", ... reservation_id=result["reservation_id"] ... ) ... ... return result


Transactional Outbox

OutboxRelayer

Background relayer for publishing outbox events.

The relayer polls the database for pending events and publishes them to a Message Broker. It implements exponential backoff for retries and graceful shutdown.

Example

storage = SQLiteStorage("saga.db") relayer = OutboxRelayer( ... storage=storage, ... broker_url="http://broker-ingress.svc.cluster.local/default/default", ... poll_interval=1.0, ... max_retries=3 ... ) await relayer.start()

__init__(storage, broker_url, poll_interval=1.0, max_retries=3, batch_size=10, max_age_hours=None, wake_event=None)

Initialize the Outbox Relayer.

Parameters:

Name Type Description Default
storage StorageProtocol

Storage backend for outbox events

required
broker_url str

Message Broker URL for publishing events

required
poll_interval float

Polling interval in seconds (default: 1.0)

1.0
max_retries int

Maximum retry attempts (default: 3)

3
batch_size int

Number of events to process per batch (default: 10)

10
max_age_hours float | None

Maximum event age in hours before expiration (default: None, disabled) Events older than this are marked as 'expired' and won't be retried.

None
wake_event Event | None

Optional asyncio.Event to wake the relayer immediately when new events are added. Used with PostgreSQL LISTEN/NOTIFY integration.

None

start() async

Start the background relayer task.

This creates an HTTP client and starts the polling loop in a background task.

stop() async

Stop the background relayer task gracefully.

This cancels the polling loop and closes the HTTP client.

send_event_transactional

Send an event using the transactional outbox pattern.

This function writes an event to the outbox table instead of sending it directly. The event will be asynchronously published by the Outbox Relayer.

This ensures that event publishing is atomic with the workflow execution: - If the workflow fails, the event is not published - If the workflow succeeds, the event is guaranteed to be published

Example

With dict

@activity ... async def reserve_inventory(ctx: WorkflowContext, order_id: str) -> dict: ... reservation_id = str(uuid.uuid4()) ... await send_event_transactional( ... ctx, ... event_type="inventory.reserved", ... event_source="order-service", ... event_data={ ... "order_id": order_id, ... "reservation_id": reservation_id, ... } ... ) ... return {"reservation_id": reservation_id}

With Pydantic model (automatically converted to JSON)

@activity ... async def reserve_inventory_typed(ctx: WorkflowContext, order_id: str) -> dict: ... reservation_id = str(uuid.uuid4()) ... event = InventoryReserved( ... order_id=order_id, ... reservation_id=reservation_id, ... ) ... await send_event_transactional( ... ctx, ... event_type="inventory.reserved", ... event_source="order-service", ... event_data=event, ... ) ... return {"reservation_id": reservation_id}

Parameters:

Name Type Description Default
ctx WorkflowContext

Workflow context

required
event_type str

CloudEvent type (e.g., "order.created")

required
event_source str

CloudEvent source (e.g., "order-service")

required
event_data dict[str, Any] | BaseModel

Event payload (JSON dict or Pydantic model)

required
content_type str

Content type (defaults to "application/json")

'application/json'

Returns:

Type Description
str

Event ID (UUID)

Raises:

Type Description
Exception

If writing to outbox fails


Hooks

WorkflowHooks

Bases: Protocol

Protocol for workflow lifecycle hooks.

Users can implement this protocol to add custom observability, logging, or monitoring to their workflows. All methods are optional - implement only the ones you need.

The framework will check if a hook method exists before calling it, so partial implementations are fully supported.

on_activity_complete(instance_id, activity_id, activity_name, result, cache_hit) async

Called after an activity completes successfully.

Parameters:

Name Type Description Default
instance_id str

Unique workflow instance ID

required
activity_id str

Activity ID (e.g., "reserve_inventory:1")

required
activity_name str

Name of the activity function

required
result Any

Return value from the activity

required
cache_hit bool

True if result was retrieved from cache (replay)

required

on_activity_failed(instance_id, activity_id, activity_name, error) async

Called when an activity fails with an exception.

Parameters:

Name Type Description Default
instance_id str

Unique workflow instance ID

required
activity_id str

Activity ID (e.g., "reserve_inventory:1")

required
activity_name str

Name of the activity function

required
error Exception

Exception that caused the failure

required

on_activity_retry(instance_id, activity_id, activity_name, error, attempt, delay) async

Called when an activity is about to be retried after a failure.

This hook is called BEFORE the retry delay (asyncio.sleep), allowing observability tools to track retry attempts in real-time.

Parameters:

Name Type Description Default
instance_id str

Unique workflow instance ID

required
activity_id str

Activity ID (e.g., "my_activity:1")

required
activity_name str

Name of the activity function

required
error Exception

Exception that caused the failure

required
attempt int

Current attempt number (1-indexed, before retry)

required
delay float

Backoff delay in seconds before the next retry

required

on_activity_start(instance_id, activity_id, activity_name, is_replaying) async

Called before an activity executes.

Parameters:

Name Type Description Default
instance_id str

Unique workflow instance ID

required
activity_id str

Activity ID (e.g., "reserve_inventory:1")

required
activity_name str

Name of the activity function

required
is_replaying bool

True if this is a replay (cached result)

required

on_event_received(instance_id, event_type, event_data) async

Called when a workflow receives an awaited event.

Parameters:

Name Type Description Default
instance_id str

Unique workflow instance ID

required
event_type str

CloudEvents type

required
event_data dict[str, Any]

Event payload

required

on_event_sent(event_type, event_source, event_data) async

Called when an event is sent (transactional outbox).

Parameters:

Name Type Description Default
event_type str

CloudEvents type

required
event_source str

CloudEvents source

required
event_data dict[str, Any]

Event payload

required

on_workflow_cancelled(instance_id, workflow_name) async

Called when a workflow is cancelled.

Parameters:

Name Type Description Default
instance_id str

Unique workflow instance ID

required
workflow_name str

Name of the workflow function

required

on_workflow_complete(instance_id, workflow_name, result) async

Called when a workflow completes successfully.

Parameters:

Name Type Description Default
instance_id str

Unique workflow instance ID

required
workflow_name str

Name of the workflow function

required
result Any

Return value from the workflow

required

on_workflow_failed(instance_id, workflow_name, error) async

Called when a workflow fails with an exception.

Parameters:

Name Type Description Default
instance_id str

Unique workflow instance ID

required
workflow_name str

Name of the workflow function

required
error Exception

Exception that caused the failure

required

on_workflow_start(instance_id, workflow_name, input_data) async

Called when a workflow starts execution.

Parameters:

Name Type Description Default
instance_id str

Unique workflow instance ID

required
workflow_name str

Name of the workflow function

required
input_data dict[str, Any]

Input parameters passed to the workflow

required

HooksBase

Bases: WorkflowHooks, ABC

Abstract base class for WorkflowHooks implementations.

This can be used as a base class for partial implementations, so you don't have to implement all methods.

Example

class MyHooks(HooksBase): ... async def on_workflow_start(self, instance_id, workflow_name, input_data): ... print(f"Workflow started: {workflow_name}") ... # Other methods are no-ops (inherited from HooksBase)


Retry

RetryPolicy

Retry policy configuration for activities.

Inspired by Restate's retry mechanism with Edda-specific optimizations.

Attributes:

Name Type Description
initial_interval float

First retry delay in seconds

backoff_coefficient float

Exponential backoff multiplier

max_interval float

Maximum retry delay in seconds (caps exponential growth)

max_attempts int | None

Maximum retry attempts (None = infinite, use with caution)

max_duration float | None

Maximum total retry duration in seconds (None = infinite)

retryable_error_types tuple[type[Exception], ...]

Tuple of exception types to retry

non_retryable_error_types tuple[type[Exception], ...]

Tuple of exception types to never retry

Example

Default policy (5 attempts, exponential backoff)

policy = RetryPolicy()

Custom policy

policy = RetryPolicy( initial_interval=0.5, backoff_coefficient=1.5, max_attempts=10, max_duration=120.0, )

Infinite retry (Restate-style, use with caution)

policy = RetryPolicy(max_attempts=None, max_duration=None)

calculate_delay(attempt)

Calculate backoff delay for given attempt number.

Formula: delay = initial_interval * (backoff_coefficient ^ (attempt - 1)) Capped at max_interval to prevent excessive delays.

Parameters:

Name Type Description Default
attempt int

Current attempt number (1-indexed)

required

Returns:

Type Description
float

Delay in seconds (exponential backoff, capped at max_interval)

Example

Default policy: initial=1.0, coefficient=2.0, max=60.0

Attempt 1: 1.0s

Attempt 2: 2.0s

Attempt 3: 4.0s

Attempt 4: 8.0s

Attempt 5: 16.0s

Attempt 6: 32.0s

Attempt 7: 60.0s (capped)

Attempt 8: 60.0s (capped)

is_retryable(error)

Determine if an error is retryable.

Priority: 1. TerminalError -> always non-retryable 2. non_retryable_error_types -> non-retryable 3. retryable_error_types -> retryable 4. Default: non-retryable (safe default)

Parameters:

Name Type Description Default
error Exception

Exception to check

required

Returns:

Type Description
bool

True if error should be retried, False otherwise

RetryExhaustedError

Bases: Exception

Raised when an activity exhausts all retry attempts.

The original exception can be accessed via the cause attribute.

Attributes:

Name Type Description
__cause__

The original exception (from the last retry attempt)

Example

try: await my_activity(ctx) except RetryExhaustedError as e: # Error message: "Activity failed after 5 attempts: ..." print(f"Retries exhausted: {e}")

# Access original error via __cause__
original_error = e.__cause__
if isinstance(original_error, NetworkError):
    logger.error("Network issue detected")

TerminalError

Bases: Exception

Raised to indicate a non-retryable error.

Activities can raise this exception to immediately stop retry attempts. This is useful for errors that will never succeed (e.g., invalid input, authorization failure, resource not found).

The original exception is accessible via the cause attribute.

Example

@activity async def validate_user(ctx: WorkflowContext, user_id: str): user = await fetch_user(user_id) if not user: # Don't retry - user doesn't exist raise TerminalError(f"User {user_id} not found") return user


WSGI

create_wsgi_app

Create a WSGI-compatible application from an EddaApp instance.

This function wraps an EddaApp (ASGI) with a2wsgi's ASGIMiddleware, making it compatible with WSGI servers like gunicorn or uWSGI.

Parameters:

Name Type Description Default
edda_app EddaApp

An initialized EddaApp instance

required

Returns:

Type Description
Any

A WSGI-compatible application callable

Example

Basic usage with EddaApp::

from edda import EddaApp
from edda.wsgi import create_wsgi_app
from edda.storage.sqlalchemy_storage import SQLAlchemyStorage

# Create storage and EddaApp
storage = SQLAlchemyStorage("sqlite:///edda.db")
app = EddaApp(storage=storage)

# Create WSGI application
wsgi_app = create_wsgi_app(app)

Running with gunicorn::

# In your module (e.g., demo_app.py):
from edda import EddaApp
from edda.wsgi import create_wsgi_app

application = EddaApp(...)  # ASGI
wsgi_application = create_wsgi_app(application)  # WSGI

# Command line:
$ gunicorn demo_app:wsgi_application --workers 4

Running with uWSGI::

$ uwsgi --http :8000 --wsgi-file demo_app.py --callable wsgi_application

Background tasks (auto-resume, timer checks, etc.) will run in each worker process.

For production deployments, ASGI servers (uvicorn, hypercorn) are recommended for better performance with Edda's async architecture. WSGI support is provided for compatibility with existing infrastructure and for users who prefer synchronous programming with sync activities.

See Also
  • :class:edda.app.EddaApp: The main ASGI application class
  • :func:edda.activity.activity: Decorator supporting sync activities