Channels
Romancy provides a powerful channel-based message passing system for communication between workflows. This enables patterns like pub/sub, work queues, and direct messaging between workflow instances.
Overview
Channels allow workflows to:
- Subscribe to named channels with different delivery modes
- Receive messages asynchronously (workflow suspends while waiting)
- Publish messages to all subscribers
- Send directly to specific workflow instances
Delivery Modes
Romancy supports three delivery modes:
| Mode | Description | Use Case |
|---|---|---|
ModeBroadcast | All subscribers receive every message | Notifications, events |
ModeCompeting | Each message goes to exactly one subscriber | Work queues, load balancing |
ModeDirect | Receives messages sent via SendTo to this instance | Point-to-point, request-response |
Basic Usage
Subscribe to a Channel
import "github.com/i2y/romancy"
var myWorkflow = romancy.DefineWorkflow("my_workflow",
func(ctx *romancy.WorkflowContext, input MyInput) (MyResult, error) {
// Subscribe to receive all messages (broadcast mode)
if err := romancy.Subscribe(ctx, "notifications", romancy.ModeBroadcast); err != nil {
return MyResult{}, err
}
// Or subscribe in competing mode (work queue)
if err := romancy.Subscribe(ctx, "tasks", romancy.ModeCompeting); err != nil {
return MyResult{}, err
}
// ... rest of workflow
},
)Receive Messages
// Define message type
type ChatMessage struct {
From string `json:"from"`
Content string `json:"content"`
Time time.Time `json:"time"`
}
var chatWorkflow = romancy.DefineWorkflow("chat_workflow",
func(ctx *romancy.WorkflowContext, input ChatInput) (ChatResult, error) {
// Subscribe to channel
if err := romancy.Subscribe(ctx, "chat", romancy.ModeBroadcast); err != nil {
return ChatResult{}, err
}
// Wait for a message (workflow suspends here)
msg, err := romancy.Receive[ChatMessage](ctx, "chat")
if err != nil {
return ChatResult{}, err
}
fmt.Printf("Received: %s from %s\n", msg.Data.Content, msg.Data.From)
return ChatResult{LastMessage: msg.Data.Content}, nil
},
)Receive with Timeout
// Wait for message with timeout
msg, err := romancy.Receive[ChatMessage](ctx, "chat",
romancy.WithReceiveTimeout(30*time.Second),
)
if err != nil {
// Check for timeout
var timeoutErr *romancy.ChannelMessageTimeoutError
if errors.As(err, &timeoutErr) {
fmt.Println("No message received within timeout")
return ChatResult{Status: "timeout"}, nil
}
return ChatResult{}, err
}Publish Messages
var publisherWorkflow = romancy.DefineWorkflow("publisher",
func(ctx *romancy.WorkflowContext, input PublishInput) (PublishResult, error) {
message := ChatMessage{
From: "system",
Content: input.Message,
Time: time.Now(),
}
// Publish to all subscribers
if err := romancy.Publish(ctx, "chat", message); err != nil {
return PublishResult{}, err
}
return PublishResult{Status: "published"}, nil
},
)Publish with Metadata
// Attach metadata to messages
err := romancy.Publish(ctx, "events", eventData,
romancy.WithMetadata(map[string]any{
"priority": "high",
"source": "order-service",
}),
)Send to Specific Instance (SendTo)
SendTo sends a message directly to a specific workflow instance. Use ModeDirect on the receiver side for easy setup.
// === Receiver Workflow ===
// Subscribe with ModeDirect to receive SendTo messages
// This automatically subscribes to "notifications:instanceID"
if err := romancy.Subscribe(ctx, "notifications", romancy.ModeDirect); err != nil {
return MyResult{}, err
}
// Receive works normally - ModeDirect handles the channel name internally
msg, err := romancy.Receive[MyMessage](ctx, "notifications")
// === Sender Workflow ===
// Send directly to a specific workflow instance
targetInstanceID := "wf-12345"
err := romancy.SendTo(ctx, targetInstanceID, "notifications", message)This design matches Erlang/Elixir’s mailbox semantics where each process has its own mailbox identified by its PID.
Unsubscribe
// Remove subscription when no longer needed
if err := romancy.Unsubscribe(ctx, "notifications"); err != nil {
return MyResult{}, err
}Transactional Message Passing
When Publish or SendTo is called within an activity (inside a database transaction), Romancy ensures transactional consistency:
| Scenario | Behavior |
|---|---|
| Inside TX → Commit | Message saved, delivery after commit |
| Inside TX → Rollback | Message not saved, no delivery |
| Outside TX | Message saved, immediate delivery |
This guarantees that:
- Messages are only delivered if the transaction commits successfully
- Rollbacks prevent any message delivery (consistency)
- No duplicate messages or lost messages
Example: Transactional Publish
var orderActivity = romancy.DefineActivity("process_order",
func(ctx context.Context, order Order) (OrderResult, error) {
// This runs inside a transaction by default
wfCtx := romancy.GetWorkflowContext(ctx)
// Database operations...
session := wfCtx.Session()
_, err := session.ExecContext(ctx,
"INSERT INTO orders (id, status) VALUES (?, ?)",
order.ID, "processing",
)
if err != nil {
return OrderResult{}, err // Rollback - no message sent
}
// Message is queued but NOT delivered yet
if err := romancy.Publish(wfCtx, "order-events", OrderEvent{
OrderID: order.ID,
Status: "processing",
}); err != nil {
return OrderResult{}, err // Rollback - no message sent
}
return OrderResult{Status: "ok"}, nil
// Commit happens here - message delivered after commit
},
)ReceivedMessage Structure
When receiving a message, you get a ReceivedMessage[T] struct:
type ReceivedMessage[T any] struct {
ID int64 // Message ID
ChannelName string // Channel name
Data T // Your message data
Metadata map[string]any // Optional metadata
SenderInstanceID string // Sender's instance ID (if sent via SendTo)
CreatedAt time.Time // When message was created
}Complete Example
Here’s a complete example showing consumer and producer workflows:
package main
import (
"context"
"errors"
"fmt"
"log"
"time"
"github.com/i2y/romancy"
)
// Message types
type TaskMessage struct {
TaskID string `json:"task_id"`
Payload string `json:"payload"`
}
type TaskResult struct {
TaskID string `json:"task_id"`
Status string `json:"status"`
}
// Consumer workflow - processes tasks from queue
var taskConsumer = romancy.DefineWorkflow("task_consumer",
func(ctx *romancy.WorkflowContext, input struct{}) ([]TaskResult, error) {
var results []TaskResult
// Subscribe in competing mode (work queue)
if err := romancy.Subscribe(ctx, "tasks", romancy.ModeCompeting); err != nil {
return nil, err
}
// Process up to 10 tasks
for i := 0; i < 10; i++ {
msg, err := romancy.Receive[TaskMessage](ctx, "tasks",
romancy.WithReceiveTimeout(5*time.Second),
)
if err != nil {
var timeoutErr *romancy.ChannelMessageTimeoutError
if errors.As(err, &timeoutErr) {
break // No more tasks
}
return nil, err
}
log.Printf("Processing task: %s", msg.Data.TaskID)
results = append(results, TaskResult{
TaskID: msg.Data.TaskID,
Status: "completed",
})
}
return results, nil
},
)
// Producer workflow - creates tasks
var taskProducer = romancy.DefineWorkflow("task_producer",
func(ctx *romancy.WorkflowContext, tasks []TaskMessage) (int, error) {
for _, task := range tasks {
if err := romancy.Publish(ctx, "tasks", task); err != nil {
return 0, err
}
}
return len(tasks), nil
},
)
func main() {
app := romancy.NewApp(
romancy.WithDatabase("channels.db"),
romancy.WithWorkerID("worker-1"),
)
ctx := context.Background()
if err := app.Start(ctx); err != nil {
log.Fatal(err)
}
defer app.Shutdown(ctx)
// Start a consumer
consumerID, _ := romancy.StartWorkflow(ctx, app, taskConsumer, struct{}{})
fmt.Printf("Started consumer: %s\n", consumerID)
// Start a producer with tasks
tasks := []TaskMessage{
{TaskID: "task-1", Payload: "data1"},
{TaskID: "task-2", Payload: "data2"},
{TaskID: "task-3", Payload: "data3"},
}
producerID, _ := romancy.StartWorkflow(ctx, app, taskProducer, tasks)
fmt.Printf("Started producer: %s\n", producerID)
// Wait for processing...
time.Sleep(10 * time.Second)
}Use Cases
Fan-Out (Broadcast)
Send notifications to all interested workflows:
// All subscribers receive the notification
romancy.Subscribe(ctx, "system-alerts", romancy.ModeBroadcast)
romancy.Publish(ctx, "system-alerts", AlertMessage{...})Work Queue (Competing)
Distribute work across multiple workers:
// Only one subscriber receives each task
romancy.Subscribe(ctx, "work-queue", romancy.ModeCompeting)
msg, _ := romancy.Receive[WorkItem](ctx, "work-queue")Direct Messaging (SendTo)
Point-to-point communication with specific workflow instances:
// === Receiver Workflow ===
// Subscribe with ModeDirect to receive SendTo messages
romancy.Subscribe(ctx, "messages", romancy.ModeDirect)
// Receive works with the original channel name
msg, _ := romancy.Receive[Message](ctx, "messages")
fmt.Printf("Received from %s: %s\n", msg.Data.From, msg.Data.Content)
// === Sender Workflow ===
// Send directly to a specific instance
romancy.SendTo(ctx, targetInstanceID, "messages", Message{
From: ctx.InstanceID(),
Content: "Hello!",
})See Also
- Event Waiting: Wait for external CloudEvents
- Workflows & Activities: Core workflow concepts
- Transactional Outbox: Reliable event publishing