Event Waiting & Timers
Romancy provides powerful event waiting capabilities that allow workflows to pause and wait for external events or timers without blocking worker processes. This enables efficient resource utilization and event-driven workflow orchestration.
Overview
Event waiting in Romancy allows workflows to:
- Pause execution and wait for external events
- Release worker processes while waiting (non-blocking)
- Resume automatically when events arrive
- Handle timeouts gracefully
- Wait for specific durations with timers
Key Functions
WaitEvent()
Wait for an external event with optional timeout:
package main
import (
"context"
"time"
"github.com/i2y/romancy"
)
// PaymentResult represents the result of payment processing
type PaymentResult struct {
PaymentID string `json:"payment_id"`
Amount float64 `json:"amount"`
}
var myWorkflow = romancy.DefineWorkflow("my_workflow",
func(ctx *romancy.WorkflowContext, input string) (PaymentResult, error) {
// Wait for an event (with 5-minute timeout)
event, err := romancy.WaitEvent(ctx, "payment.completed",
romancy.WithTimeout(5*time.Minute),
)
if err != nil {
return PaymentResult{}, err
}
// Access event data
paymentID := event.Data["payment_id"].(string)
amount := event.Data["amount"].(float64)
return PaymentResult{
PaymentID: paymentID,
Amount: amount,
}, nil
},
)Sleep()
Sleep for a specific duration:
package main
import (
"context"
"time"
"github.com/i2y/romancy"
)
type SleepResult struct {
Status string `json:"status"`
}
var myWorkflow = romancy.DefineWorkflow("my_workflow",
func(ctx *romancy.WorkflowContext, input string) (SleepResult, error) {
// Sleep for 60 seconds
err := romancy.Sleep(ctx, 60*time.Second)
if err != nil {
return SleepResult{}, err
}
// Continue execution after timer expires
return processTimeout(ctx)
},
)Detailed API Reference
WaitEvent()
func WaitEvent(
ctx *WorkflowContext,
eventType string,
opts ...WaitEventOption,
) (*ReceivedEvent, error)Parameters
ctx: The workflow contexteventType: CloudEvents type to wait for (e.g., “payment.completed”)opts: Optional configuration (timeout, etc.)
Options
WithTimeout(d time.Duration): Set timeout durationWithEventID(id string): Set custom event wait ID (auto-generated if not provided)
Returns
*ReceivedEvent containing:
Data: Event data (map[string]any)Type: CloudEvents typeSource: Event sourceSubject: Optional event subjectTime: Event timestampID: Event ID
Error Handling
event, err := romancy.WaitEvent(ctx, "payment.completed",
romancy.WithTimeout(5*time.Minute),
)
if err != nil {
if errors.Is(err, romancy.ErrTimeout) {
// Handle timeout
return nil, fmt.Errorf("payment timed out")
}
return nil, err
}Example with Type-Safe Struct
package main
import (
"context"
"encoding/json"
"time"
"github.com/i2y/romancy"
)
// PaymentCompleted represents payment completion data
type PaymentCompleted struct {
PaymentID string `json:"payment_id"`
Amount float64 `json:"amount"`
Status string `json:"status"`
}
// PaymentWorkflowResult is the result of the payment workflow
type PaymentWorkflowResult struct {
OrderID string `json:"order_id"`
PaymentID string `json:"payment_id"`
}
var paymentWorkflow = romancy.DefineWorkflow("payment_workflow",
func(ctx *romancy.WorkflowContext, orderID string) (PaymentWorkflowResult, error) {
// Wait for event
event, err := romancy.WaitEvent(ctx, "payment.completed",
romancy.WithTimeout(5*time.Minute),
)
if err != nil {
return PaymentWorkflowResult{}, err
}
// Parse event data into struct
var payment PaymentCompleted
if err := parseEventData(event.Data, &payment); err != nil {
return PaymentWorkflowResult{}, err
}
if payment.Status == "success" {
return PaymentWorkflowResult{
OrderID: orderID,
PaymentID: payment.PaymentID,
}, nil
}
return PaymentWorkflowResult{}, fmt.Errorf("payment failed: %s", payment.Status)
},
)
// parseEventData converts map to struct
func parseEventData(data map[string]any, target any) error {
bytes, err := json.Marshal(data)
if err != nil {
return err
}
return json.Unmarshal(bytes, target)
}Sleep()
func Sleep(
ctx *WorkflowContext,
duration time.Duration,
opts ...SleepOption,
) errorParameters
ctx: The workflow contextduration: Duration to sleepopts: Optional configuration
Options
WithSleepID(id string): Set custom timer ID (auto-generated if not provided)
Example
package main
import (
"context"
"time"
"github.com/i2y/romancy"
)
type ScheduledTaskResult struct {
TaskID string `json:"task_id"`
Status string `json:"status"`
}
var scheduledTask = romancy.DefineWorkflow("scheduled_task",
func(ctx *romancy.WorkflowContext, taskID string) (ScheduledTaskResult, error) {
// Execute immediately
if _, err := performInitialTask.Execute(ctx, taskID); err != nil {
return ScheduledTaskResult{}, err
}
// Sleep for 1 hour
if err := romancy.Sleep(ctx, 1*time.Hour); err != nil {
return ScheduledTaskResult{}, err
}
// Execute after delay
return performDelayedTask.Execute(ctx, taskID)
},
)Common Patterns
Payment Processing with Timeout
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/i2y/romancy"
)
type PaymentInitResult struct {
PaymentID string `json:"payment_id"`
}
type PaymentTimeoutResult struct {
Status string `json:"status"`
}
var paymentWithTimeout = romancy.DefineWorkflow("payment_with_timeout",
func(ctx *romancy.WorkflowContext, orderID string) (PaymentTimeoutResult, error) {
// Initiate payment
paymentResult, err := initiatePayment.Execute(ctx, orderID)
if err != nil {
return PaymentTimeoutResult{}, err
}
paymentID := paymentResult.PaymentID
// Wait for payment completion (5-minute timeout)
event, err := romancy.WaitEvent(ctx, "payment.completed",
romancy.WithTimeout(5*time.Minute),
)
if err != nil {
if errors.Is(err, romancy.ErrTimeout) {
// Timeout occurred - cancel payment and order
if _, err := cancelPayment.Execute(ctx, paymentID); err != nil {
return PaymentTimeoutResult{}, err
}
if _, err := cancelOrder.Execute(ctx, orderID); err != nil {
return PaymentTimeoutResult{}, err
}
return PaymentTimeoutResult{Status: "timeout"}, nil
}
return PaymentTimeoutResult{}, err
}
// Check payment status
if event.Data["status"] == "success" {
if _, err := fulfillOrder.Execute(ctx, orderID); err != nil {
return PaymentTimeoutResult{}, err
}
return PaymentTimeoutResult{Status: "completed"}, nil
}
// Payment failed
if _, err := cancelOrder.Execute(ctx, orderID); err != nil {
return PaymentTimeoutResult{}, err
}
return PaymentTimeoutResult{Status: "payment_failed"}, nil
},
)Approval Workflow
package main
import (
"context"
"errors"
"time"
"github.com/i2y/romancy"
)
type ApprovalResult struct {
Approved bool `json:"approved"`
Approver string `json:"approver"`
Escalated bool `json:"escalated"`
}
var approvalWorkflow = romancy.DefineWorkflow("approval_workflow",
func(ctx *romancy.WorkflowContext, requestID string) (ApprovalResult, error) {
// Send approval request to manager
if _, err := sendApprovalRequest.Execute(ctx, requestID, "manager"); err != nil {
return ApprovalResult{}, err
}
// Wait for manager approval (24 hours)
event, err := romancy.WaitEvent(ctx, "approval.decision",
romancy.WithTimeout(24*time.Hour),
)
if err != nil {
if errors.Is(err, romancy.ErrTimeout) {
// Escalate to director
if _, err := sendApprovalRequest.Execute(ctx, requestID, "director"); err != nil {
return ApprovalResult{}, err
}
// Wait for director approval (24 hours)
event, err = romancy.WaitEvent(ctx, "approval.decision",
romancy.WithTimeout(24*time.Hour),
)
if err != nil {
return ApprovalResult{}, err
}
return ApprovalResult{
Approved: event.Data["approved"].(bool),
Approver: event.Data["approver"].(string),
Escalated: true,
}, nil
}
return ApprovalResult{}, err
}
return ApprovalResult{
Approved: event.Data["approved"].(bool),
Approver: event.Data["approver"].(string),
}, nil
},
)Batch Processing with Delays
package main
import (
"context"
"time"
"github.com/i2y/romancy"
)
// BatchItem represents an item to process
type BatchItem struct {
ID string `json:"id"`
Data string `json:"data"`
}
type BatchResult struct {
Processed int `json:"processed"`
}
type BatchProcessorResult struct {
BatchID string `json:"batch_id"`
Results []BatchResult `json:"results"`
}
var batchProcessor = romancy.DefineWorkflow("batch_processor",
func(ctx *romancy.WorkflowContext, input BatchInput) (BatchProcessorResult, error) {
batchSize := 10
results := make([]BatchResult, 0)
for i := 0; i < len(input.Items); i += batchSize {
end := i + batchSize
if end > len(input.Items) {
end = len(input.Items)
}
batch := input.Items[i:end]
// Process batch
batchResult, err := processBatch.Execute(ctx, batch)
if err != nil {
return BatchProcessorResult{}, err
}
results = append(results, batchResult)
// Sleep between batches (except for last batch)
if end < len(input.Items) {
if err := romancy.Sleep(ctx, 5*time.Second); err != nil {
return BatchProcessorResult{}, err
}
}
}
return BatchProcessorResult{
BatchID: input.BatchID,
Results: results,
}, nil
},
)
type BatchInput struct {
BatchID string `json:"batch_id"`
Items []BatchItem `json:"items"`
}Orchestrating Multiple Services
package main
import (
"context"
"time"
"github.com/i2y/romancy"
)
type ServiceResult struct {
Status string `json:"status"`
Data string `json:"data"`
}
type OrchestrationResult struct {
ServiceA ServiceResult `json:"service_a"`
ServiceB ServiceResult `json:"service_b"`
ServiceC ServiceResult `json:"service_c"`
}
var multiServiceOrchestration = romancy.DefineWorkflow("multi_service_orchestration",
func(ctx *romancy.WorkflowContext, requestID string) (OrchestrationResult, error) {
// Start all services
if _, err := triggerServiceA.Execute(ctx, requestID); err != nil {
return OrchestrationResult{}, err
}
if _, err := triggerServiceB.Execute(ctx, requestID); err != nil {
return OrchestrationResult{}, err
}
if _, err := triggerServiceC.Execute(ctx, requestID); err != nil {
return OrchestrationResult{}, err
}
var result OrchestrationResult
// Wait for Service A
eventA, err := romancy.WaitEvent(ctx, "service.a.completed",
romancy.WithTimeout(10*time.Minute),
)
if err != nil {
return OrchestrationResult{}, err
}
result.ServiceA = ServiceResult{
Status: eventA.Data["status"].(string),
Data: eventA.Data["data"].(string),
}
// Wait for Service B
eventB, err := romancy.WaitEvent(ctx, "service.b.completed",
romancy.WithTimeout(10*time.Minute),
)
if err != nil {
return OrchestrationResult{}, err
}
result.ServiceB = ServiceResult{
Status: eventB.Data["status"].(string),
Data: eventB.Data["data"].(string),
}
// Wait for Service C
eventC, err := romancy.WaitEvent(ctx, "service.c.completed",
romancy.WithTimeout(10*time.Minute),
)
if err != nil {
return OrchestrationResult{}, err
}
result.ServiceC = ServiceResult{
Status: eventC.Data["status"].(string),
Data: eventC.Data["data"].(string),
}
// Return aggregated results
return result, nil
},
)Sending Events to Waiting Workflows
Using CloudEvents HTTP
Send CloudEvents to resume waiting workflows:
package main
import (
"bytes"
"encoding/json"
"net/http"
cloudevents "github.com/cloudevents/sdk-go/v2"
)
func sendPaymentCompletedEvent() error {
// Create CloudEvent
event := cloudevents.NewEvent()
event.SetType("payment.completed")
event.SetSource("payment-service")
event.SetData(cloudevents.ApplicationJSON, map[string]any{
"payment_id": "PAY-123",
"amount": 99.99,
"status": "success",
})
// Convert to JSON
eventBytes, err := json.Marshal(event)
if err != nil {
return err
}
// Send to Romancy
req, err := http.NewRequest("POST", "http://localhost:8001/", bytes.NewBuffer(eventBytes))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/cloudevents+json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}Using curl
# Send CloudEvent via curl
curl -X POST http://localhost:8001/ \
-H "Content-Type: application/cloudevents+json" \
-d '{
"specversion": "1.0",
"type": "payment.completed",
"source": "payment-service",
"id": "evt-001",
"data": {
"payment_id": "PAY-123",
"amount": 99.99,
"status": "success"
}
}'Using send_event() from Activities
From within workflows or activities:
package main
import (
"context"
"github.com/i2y/romancy"
)
type ProcessPaymentResult struct {
Processed bool `json:"processed"`
}
var processPayment = romancy.DefineActivity("process_payment",
func(ctx context.Context, paymentData PaymentData) (ProcessPaymentResult, error) {
wfCtx := romancy.GetWorkflowContext(ctx)
// Process payment...
// Send event to resume waiting workflows
err := romancy.SendEvent(wfCtx,
"payment.completed",
"payment-processor",
map[string]any{
"payment_id": paymentData.ID,
"amount": paymentData.Amount,
"status": "success",
},
)
if err != nil {
return ProcessPaymentResult{}, err
}
return ProcessPaymentResult{Processed: true}, nil
},
)
type PaymentData struct {
ID string `json:"id"`
Amount float64 `json:"amount"`
}Best Practices
1. Always Set Timeouts
Prevent workflows from waiting indefinitely:
// Good: Has timeout
event, err := romancy.WaitEvent(ctx, "payment.completed",
romancy.WithTimeout(5*time.Minute),
)
// Bad: No timeout (could wait forever)
event, err := romancy.WaitEvent(ctx, "payment.completed")2. Handle Timeout Gracefully
event, err := romancy.WaitEvent(ctx, "approval.decision",
romancy.WithTimeout(1*time.Hour),
)
if err != nil {
if errors.Is(err, romancy.ErrTimeout) {
// Handle timeout scenario
return handleTimeoutScenario(ctx)
}
return nil, err
}
// Process approval3. Use Unique Event Types
Avoid event type collisions:
// Good: Specific event types
event, _ := romancy.WaitEvent(ctx, "order.payment.completed",
romancy.WithTimeout(5*time.Minute),
)
event, _ := romancy.WaitEvent(ctx, "subscription.payment.completed",
romancy.WithTimeout(5*time.Minute),
)
// Bad: Generic event type
event, _ := romancy.WaitEvent(ctx, "completed",
romancy.WithTimeout(5*time.Minute),
) // Too generic4. Include Correlation Data in Event Type
For matching specific events:
type OrderWorkflowResult struct {
PaymentID string `json:"payment_id"`
Amount float64 `json:"amount"`
}
var orderWorkflow = romancy.DefineWorkflow("order_workflow",
func(ctx *romancy.WorkflowContext, orderID string) (OrderWorkflowResult, error) {
// Send order_id as part of event type for correlation
if _, err := initiatePayment.Execute(ctx, orderID); err != nil {
return OrderWorkflowResult{}, err
}
// Wait for event with matching order_id
event, err := romancy.WaitEvent(ctx,
fmt.Sprintf("payment.completed.%s", orderID),
romancy.WithTimeout(5*time.Minute),
)
if err != nil {
return OrderWorkflowResult{}, err
}
return OrderWorkflowResult{
PaymentID: event.Data["payment_id"].(string),
Amount: event.Data["amount"].(float64),
}, nil
},
)5. Consider Idempotency
Ensure events can be safely redelivered:
type ProcessCheckResult struct {
Processed bool `json:"processed"`
}
type IdempotentResult struct {
RequestID string `json:"request_id"`
Status string `json:"status"`
}
var idempotentWorkflow = romancy.DefineWorkflow("idempotent_workflow",
func(ctx *romancy.WorkflowContext, requestID string) (IdempotentResult, error) {
// Check if already processed
result, err := isAlreadyProcessed.Execute(ctx, requestID)
if err != nil {
return IdempotentResult{}, err
}
if result.Processed {
return getPreviousResult.Execute(ctx, requestID)
}
// Wait for event
event, err := romancy.WaitEvent(ctx,
fmt.Sprintf("process.%s", requestID),
romancy.WithTimeout(10*time.Minute),
)
if err != nil {
return IdempotentResult{}, err
}
// Process idempotently
return processIdempotently.Execute(ctx, event.Data)
},
)Advanced Topics
Multiple Event Types
Wait for different event types sequentially:
type MultiEventResult struct {
Status string `json:"status"`
}
var multiEventWorkflow = romancy.DefineWorkflow("multi_event_workflow",
func(ctx *romancy.WorkflowContext, orderID string) (MultiEventResult, error) {
// Option 1: Sequential checks with short timeouts
event, err := romancy.WaitEvent(ctx, "payment.completed",
romancy.WithTimeout(1*time.Minute),
)
if err != nil {
if errors.Is(err, romancy.ErrTimeout) {
// Try waiting for failure event
event, err = romancy.WaitEvent(ctx, "payment.failed",
romancy.WithTimeout(1*time.Minute),
)
if err != nil {
return MultiEventResult{}, err
}
} else {
return MultiEventResult{}, err
}
}
// Process based on event type
if event.Type == "payment.completed" {
return MultiEventResult{Status: "success"}, nil
}
return MultiEventResult{Status: "failed"}, nil
},
)Event Filtering
Filter events based on data:
type FilteredEventResult struct {
Action string `json:"action"`
Status string `json:"status"`
}
var filteredEventWorkflow = romancy.DefineWorkflow("filtered_event_workflow",
func(ctx *romancy.WorkflowContext, userID string) (FilteredEventResult, error) {
// Wait for event specific to this user
event, err := romancy.WaitEvent(ctx,
fmt.Sprintf("user.action.%s", userID),
romancy.WithTimeout(5*time.Minute),
)
if err != nil {
return FilteredEventResult{}, err
}
// Additional filtering can be done after receiving
if event.Data["action"] == "purchase" {
return processPurchase.Execute(ctx, event.Data)
}
return FilteredEventResult{Action: "skipped"}, nil
},
)Cancellation
Workflows waiting for events can be cancelled:
package main
import (
"context"
"net/http"
"github.com/i2y/romancy"
)
// Cancel via HTTP API
// POST /cancel/{instance_id}
// Or programmatically
func cancelWaitingWorkflow(ctx context.Context, app *romancy.App, instanceID string) error {
return app.CancelWorkflow(ctx, instanceID)
}Monitoring and Debugging
Viewer UI
The Romancy Viewer shows waiting workflows:
- Status:
waiting_for_eventorwaiting_for_timer - Event type being waited for
- Timeout information
- Time elapsed
Logging
Add logging for debugging:
package main
import (
"context"
"log"
"time"
"github.com/i2y/romancy"
)
type LoggedResult struct {
EventID string `json:"event_id"`
}
var loggedWorkflow = romancy.DefineWorkflow("logged_workflow",
func(ctx *romancy.WorkflowContext, input string) (LoggedResult, error) {
log.Printf("Waiting for payment event, instance: %s", ctx.InstanceID())
event, err := romancy.WaitEvent(ctx, "payment.completed",
romancy.WithTimeout(5*time.Minute),
)
if err != nil {
log.Printf("Wait event error: %v", err)
return LoggedResult{}, err
}
log.Printf("Received event: %s with data: %v", event.ID, event.Data)
return LoggedResult{EventID: event.ID}, nil
},
)Using Lifecycle Hooks
package main
import (
"context"
"log"
"github.com/i2y/romancy"
)
type DebugHooks struct {
romancy.HooksBase
}
func (h *DebugHooks) OnEventReceived(ctx context.Context, instanceID, eventType string, eventData any) {
log.Printf("Event received for workflow %s: type=%s", instanceID, eventType)
}
func main() {
app := romancy.NewApp(
romancy.WithDatabase("workflow.db"),
romancy.WithHooks(&DebugHooks{}),
)
// ...
}Performance Considerations
Worker Release
- Waiting workflows don’t consume worker resources
- Workers can handle other workflows while waiting
- Scales to thousands of waiting workflows
Event Matching
- Event type matching is exact (string comparison)
- Consider using hierarchical event types for flexibility
- Database indexes on event_type improve performance
Timer Precision
- Timers checked every 10 seconds by default
- Maximum 10-second delay in timer expiration
- Adequate for most workflow scheduling needs
Related Topics
- CloudEvents HTTP Binding - CloudEvents integration
- Workflows and Activities - Basic workflow concepts
- Saga Pattern - Compensation and rollback
- Examples: Event Waiting - Practical examples